You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2017/07/19 20:21:20 UTC

[1/7] mesos git commit: Performance optimizations for message passing.

Repository: mesos
Updated Branches:
  refs/heads/master 4e6eb4ec5 -> 8f42d0c11


Performance optimizations for message passing.

Optimizations include:

* Factored out run queue and introduced lock free implementation. This
  also required adding the concept of an `epoch` to support proper
  settling and refactoring the increments/decrements of `running` to
  make it easier to reason about.

* Replaced the use of a condition variable (the `Gate`) with a
  semaphore.

Review: https://reviews.apache.org/r/60825


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6076dbc2
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6076dbc2
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6076dbc2

Branch: refs/heads/master
Commit: 6076dbc226de80d597a8e21ea392ecf4ef3027c1
Parents: 4e6eb4e
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sun Jun 18 13:34:45 2017 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Wed Jul 19 13:18:39 2017 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/Makefile.am                 |   2 +
 .../libprocess/cmake/ProcessConfigure.cmake     |   4 +
 3rdparty/libprocess/configure.ac                |  10 +-
 3rdparty/libprocess/src/CMakeLists.txt          |   2 +
 3rdparty/libprocess/src/process.cpp             | 213 +++++++++++--------
 3rdparty/libprocess/src/run_queue.hpp           | 182 ++++++++++++++++
 3rdparty/libprocess/src/semaphore.hpp           | 183 ++++++++++++++++
 7 files changed, 501 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/6076dbc2/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index fb08b6a..4031297 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -180,6 +180,8 @@ libprocess_la_SOURCES =		\
   src/process.cpp		\
   src/process_reference.hpp	\
   src/reap.cpp			\
+  src/run_queue.hpp		\
+  src/semaphore.hpp		\
   src/socket.cpp		\
   src/subprocess.cpp		\
   src/subprocess_posix.cpp	\

http://git-wip-us.apache.org/repos/asf/mesos/blob/6076dbc2/3rdparty/libprocess/cmake/ProcessConfigure.cmake
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/cmake/ProcessConfigure.cmake b/3rdparty/libprocess/cmake/ProcessConfigure.cmake
index dd3be57..f9e70bd 100755
--- a/3rdparty/libprocess/cmake/ProcessConfigure.cmake
+++ b/3rdparty/libprocess/cmake/ProcessConfigure.cmake
@@ -40,6 +40,10 @@
 # party libraries, and where in the directory tree you need to look to get the
 # actual libraries.
 
+if (ENABLE_LOCK_FREE_RUN_QUEUE)
+  add_definitions(-DLOCK_FREE_RUN_QUEUE)
+endif ()
+
 if (ENABLE_SSL)
   find_package(OpenSSL REQUIRED)
 endif ()

http://git-wip-us.apache.org/repos/asf/mesos/blob/6076dbc2/3rdparty/libprocess/configure.ac
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/configure.ac b/3rdparty/libprocess/configure.ac
index e849b7d..cb2cf4f 100644
--- a/3rdparty/libprocess/configure.ac
+++ b/3rdparty/libprocess/configure.ac
@@ -69,6 +69,11 @@ AC_CONFIG_FILES([3rdparty/gmock_sources.cc])
 # Optional features.
 ###############################################################################
 
+AC_ARG_ENABLE([lock_free_run_queue],
+              AS_HELP_STRING([--enable-lock-free-run-queue],
+                             [enables the lock free run queue]),
+                             [], [enable_lock_free_run_queue=no])
+
 AC_ARG_ENABLE([hardening],
               AS_HELP_STRING([--disable-hardening],
                              [disables security measures such as stack
@@ -230,11 +235,14 @@ AC_ARG_WITH([svn],
 ###############################################################################
 AC_ARG_VAR([TEST_DRIVER], [executable and arguments of a test driver])
 
-
 ###############################################################################
 # Compiler checks.
 ###############################################################################
 
+# Check if we should use the lock free run queue.
+AS_IF([test "x$enable_lock_free_run_queue" = "xyes"],
+      [AC_DEFINE([LOCK_FREE_RUN_QUEUE])])
+
 # Check to see if we should harden or not.
 AM_CONDITIONAL([ENABLE_HARDENING], [test x"$enable_hardening" = "xyes"])
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/6076dbc2/3rdparty/libprocess/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/CMakeLists.txt b/3rdparty/libprocess/src/CMakeLists.txt
index 670dc15..03959ae 100644
--- a/3rdparty/libprocess/src/CMakeLists.txt
+++ b/3rdparty/libprocess/src/CMakeLists.txt
@@ -55,6 +55,8 @@ set(PROCESS_SRC
   process.cpp
   process_reference.hpp
   reap.cpp
+  run_queue.hpp
+  semaphore.hpp
   socket.cpp
   subprocess.cpp
   time.cpp

http://git-wip-us.apache.org/repos/asf/mesos/blob/6076dbc2/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index b4d7791..dff78b0 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -115,6 +115,7 @@
 #include "event_loop.hpp"
 #include "gate.hpp"
 #include "process_reference.hpp"
+#include "run_queue.hpp"
 
 namespace inet = process::network::inet;
 namespace inet4 = process::network::inet4;
@@ -571,9 +572,11 @@ private:
   // Gates for waiting threads (protected by processes_mutex).
   map<ProcessBase*, Gate*> gates;
 
-  // Queue of runnable processes (implemented using list).
-  list<ProcessBase*> runq;
-  std::recursive_mutex runq_mutex;
+  // Queue of runnable processes.
+  //
+  // See run_queue.hpp for more information about the RUN_QUEUE
+  // preprocessor definition.
+  RUN_QUEUE runq;
 
   // Number of running processes, to support Clock::settle operation.
   std::atomic_long running;
@@ -593,6 +596,7 @@ private:
   std::atomic_bool finalizing;
 };
 
+
 static internal::Flags* libprocess_flags = new internal::Flags();
 
 // Synchronization primitives for `initialize`.
@@ -627,9 +631,6 @@ static SocketManager* socket_manager = nullptr;
 // Active ProcessManager (eventually will probably be thread-local).
 static ProcessManager* process_manager = nullptr;
 
-// Scheduling gate that threads wait at when there is nothing to run.
-static Gate* gate = new Gate();
-
 // Used for authenticating HTTP requests.
 static AuthenticatorManager* authenticator_manager = nullptr;
 
@@ -2756,7 +2757,7 @@ void ProcessManager::finalize()
 
   // Send signal to all processing threads to stop running.
   joining_threads.store(true);
-  gate->open();
+  runq.decomission();
   EventLoop::stop();
 
   // Join all threads.
@@ -2811,43 +2812,29 @@ long ProcessManager::init_threads()
 
   threads.reserve(num_worker_threads + 1);
 
-  struct
-  {
-    void operator()() const
-    {
-      do {
-        ProcessBase* process = process_manager->dequeue();
-        if (process == nullptr) {
-          Gate::state_t old = gate->approach();
-          process = process_manager->dequeue();
-          if (process == nullptr) {
-            if (joining_threads.load()) {
-              break;
-            }
-            gate->arrive(old); // Wait at gate if idle.
-            continue;
-          } else {
-            gate->leave();
-          }
-        }
-        process_manager->resume(process);
-      } while (true);
-
-      // Threads are joining. Delete the thread local `_executor_`
-      // pointer to prevent a memory leak.
-      delete _executor_;
-      _executor_ = nullptr;
-    }
-
-    // We hold a constant reference to `joining_threads` to make it clear that
-    // this value is only being tested (read), and not manipulated.
-    const std::atomic_bool& joining_threads;
-  } worker{joining_threads};
-
   // Create processing threads.
   for (long i = 0; i < num_worker_threads; i++) {
     // Retain the thread handles so that we can join when shutting down.
-    threads.emplace_back(new std::thread(worker));
+    threads.emplace_back(new std::thread(
+        [this]() {
+          running.fetch_add(1);
+          do {
+            ProcessBase* process = dequeue();
+            if (process == nullptr) {
+              if (joining_threads.load()) {
+                break;
+              }
+            } else {
+              resume(process);
+            }
+          } while (true);
+          running.fetch_sub(1);
+
+          // Threads are joining. Delete the thread local `_executor_`
+          // pointer to prevent a memory leak.
+          delete _executor_;
+          _executor_ = nullptr;
+        }));
   }
 
   // Create a thread for the event loop.
@@ -3281,9 +3268,6 @@ void ProcessManager::resume(ProcessBase* process)
   }
 
   __process__ = nullptr;
-
-  CHECK_GE(running.load(), 1);
-  running.fetch_sub(1);
 }
 
 
@@ -3463,23 +3447,22 @@ bool ProcessManager::wait(const UPID& pid)
       // Check if it is runnable in order to donate this thread.
       if (process->state == ProcessBase::BOTTOM ||
           process->state == ProcessBase::READY) {
-        synchronized (runq_mutex) {
-          list<ProcessBase*>::iterator it =
-            find(runq.begin(), runq.end(), process);
-          if (it != runq.end()) {
-            // Found it! Remove it from the run queue since we'll be
-            // donating our thread and also increment 'running' before
-            // leaving this 'runq' protected critical section so that
-            // everyone that is waiting for the processes to settle
-            // continue to wait (otherwise they could see nothing in
-            // 'runq' and 'running' equal to 0 between when we exit
-            // this critical section and increment 'running').
-            runq.erase(it);
-            running.fetch_add(1);
-          } else {
-            // Another thread has resumed the process ...
-            process = nullptr;
-          }
+        // Assume that we'll be able to successfully extract the
+        // process from the run queue and optimistically increment
+        // `running` so that `Clock::settle` properly waits. In the
+        // event that we aren't able to extract the process from the
+        // run queue then we'll decrement `running`. Note that we
+        // can't assume that `running` is already non-zero because any
+        // thread may call `wait`, and thus we can't assume that we're
+        // calling it from a process that is already running.
+        running.fetch_add(1);
+
+        // Try and extract the process from the run queue. This may
+        // fail because another thread might resume the process first
+        // or the run queue might not support arbitrary extraction.
+        if (!runq.extract(process)) {
+          running.fetch_sub(1);
+          process = nullptr;
         }
       } else {
         // Process is not runnable, so no need to donate ...
@@ -3491,12 +3474,21 @@ bool ProcessManager::wait(const UPID& pid)
   if (process != nullptr) {
     VLOG(2) << "Donating thread to " << process->pid << " while waiting";
     ProcessBase* donator = __process__;
-    process_manager->resume(process);
+    resume(process);
+    running.fetch_sub(1);
     __process__ = donator;
   }
 
+  // NOTE: `process` is possibly deleted at this point and we must not
+  // use it!
+
   // TODO(benh): Donating only once may not be sufficient, so we might
   // still deadlock here ... perhaps warn if that's the case?
+  //
+  // In fact, we might want to support the ability to donate a thread
+  // to any process for a limited number of messages while we wait
+  // (i.e., donate for a message, check and see if our gate is open,
+  // if not, keep donating).
 
   // Now arrive at the gate and wait until it opens.
   if (gate != nullptr) {
@@ -3571,13 +3563,7 @@ void ProcessManager::enqueue(ProcessBase* process)
   // it's not running. Otherwise, check and see which thread this
   // process was last running on, and put it on that threads runq.
 
-  synchronized (runq_mutex) {
-    CHECK(find(runq.begin(), runq.end(), process) == runq.end());
-    runq.push_back(process);
-  }
-
-  // Wake up the processing thread if necessary.
-  gate->open();
+  runq.enqueue(process);
 }
 
 
@@ -3587,44 +3573,83 @@ ProcessBase* ProcessManager::dequeue()
   // are no processes to run, and this is not a dedicated thread, then
   // steal one from another threads runq.
 
-  ProcessBase* process = nullptr;
+  running.fetch_sub(1);
 
-  synchronized (runq_mutex) {
-    if (!runq.empty()) {
-      process = runq.front();
-      runq.pop_front();
-      // Increment the running count of processes in order to support
-      // the Clock::settle() operation (this must be done atomically
-      // with removing the process from the runq).
-      running.fetch_add(1);
-    }
-  }
+  runq.wait();
+
+  // Need to increment `running` before we dequeue from `runq` so that
+  // `Clock::settle` properly waits.
+  running.fetch_add(1);
 
-  return process;
+  ////////////////////////////////////////////////////////////
+  // NOTE: contract with the run queue is that we'll always //
+  // call `wait` _BEFORE_ we call `dequeue`.                //
+  ////////////////////////////////////////////////////////////
+  return runq.dequeue();
 }
 
 
+// NOTE: it's possible that a thread not controlled by libprocess is
+// trying to enqueue a process (e.g., due to `spawn` or because it's
+// doing a `dispatch` or `send`) and thus we'll settle when in fact we
+// should not have. There is nothing easy we can do to prevent this
+// and it hasn't been a problem historically in the usage we've seen
+// in the Mesos project.
 void ProcessManager::settle()
 {
   bool done = true;
   do {
     done = true; // Assume to start that we are settled.
 
-    synchronized (runq_mutex) {
-      if (!runq.empty()) {
-        done = false;
-        continue;
-      }
+    // See comments below as to how `epoch` helps us mitigate races
+    // with `running` and `runq`.
+    long old = runq.epoch.load();
 
-      if (running.load() > 0) {
-        done = false;
-        continue;
-      }
+    if (running.load() > 0) {
+      done = false;
+      continue;
+    }
 
-      if (!Clock::settled()) {
-        done = false;
-        continue;
-      }
+    // Race #1: it's possible that a thread starts running here
+    // because the semaphore had been signaled but nobody has woken
+    // up yet.
+
+    if (!runq.empty()) {
+      done = false;
+      continue;
+    }
+
+    // Race #2: it's possible that `runq` will get added to at this
+    // point given some threads might be running due to 'Race #1'.
+
+    if (running.load() > 0) {
+      done = false;
+      continue;
+    }
+
+    // If at this point _no_ threads are running then it must be the
+    // case that either nothing has been added to `runq` (and thus
+    // nothing really is running or will be about to run) OR
+    // `runq.epoch` must have been incremented (because the thread
+    // that enqueued something into `runq.epoch` now isn't running so
+    // it must have incremented `runq.epoch` before it decremented
+    // `running`).
+    //
+    // Note that we check `runq.epoch` _after_ we check the clock
+    // because it's possible that the clock will also add to the
+    // `runq` but in so doing it will also increment `runq.epoch`
+    // which we'll guarantee that we don't settle (and
+    // `Clock::settled()` takes care to atomically ensure that
+    // `runq.epoch` is incremented before it returns).
+
+    if (!Clock::settled()) {
+      done = false;
+      continue;
+    }
+
+    if (old != runq.epoch.load()) {
+      done = false;
+      continue;
     }
   } while (!done);
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/6076dbc2/3rdparty/libprocess/src/run_queue.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/run_queue.hpp b/3rdparty/libprocess/src/run_queue.hpp
new file mode 100644
index 0000000..109c300
--- /dev/null
+++ b/3rdparty/libprocess/src/run_queue.hpp
@@ -0,0 +1,182 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License
+
+#ifndef __PROCESS_RUN_QUEUE_HPP__
+#define __PROCESS_RUN_QUEUE_HPP__
+
+// At _configuration_ (i.e., build) time you can specify
+// RUN_QUEUE=... as an environment variable (i.e., just like CC or
+// CXXFLAGS) to pick the run queue implementation. If nothing is
+// specified we'll default to the LockingRunQueue.
+//
+// Alternatively we could have made this be a _runtime_ decision but
+// for performance reasons we wanted the run queue implementation to
+// be compile-time optimized (e.g., inlined, etc).
+//
+// Note that care should be taken not to reconfigure with a different
+// value of RUN_QUEUE when reusing a build directory!
+#define RUN_QUEUE LockingRunQueue
+
+#ifdef LOCK_FREE_RUN_QUEUE
+#define RUN_QUEUE LockFreeRunQueue
+#include <concurrentqueue.h>
+#endif // LOCK_FREE_RUN_QUEUE
+
+#include <algorithm>
+#include <list>
+
+#include <process/process.hpp>
+
+#include <stout/synchronized.hpp>
+
+#include "semaphore.hpp"
+
+namespace process {
+
+class LockingRunQueue
+{
+public:
+  bool extract(ProcessBase* process)
+  {
+    synchronized (mutex) {
+      std::list<ProcessBase*>::iterator it = std::find(
+          processes.begin(),
+          processes.end(),
+          process);
+
+      if (it != processes.end()) {
+        processes.erase(it);
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+  void wait()
+  {
+    semaphore.wait();
+  }
+
+  void enqueue(ProcessBase* process)
+  {
+    synchronized (mutex) {
+      processes.push_back(process);
+    }
+    epoch.fetch_add(1);
+    semaphore.signal();
+  }
+
+  // Precondition: `wait` must get called before `dequeue`!
+  ProcessBase* dequeue()
+  {
+    synchronized (mutex) {
+      if (!processes.empty()) {
+        ProcessBase* process = processes.front();
+        processes.pop_front();
+        return process;
+      }
+    }
+
+    return nullptr;
+  }
+
+  // NOTE: this function can't be const because `synchronized (mutex)`
+  // is not const ...
+  bool empty()
+  {
+    synchronized (mutex) {
+      return processes.empty();
+    }
+  }
+
+  void decomission()
+  {
+    semaphore.decomission();
+  }
+
+  // Epoch used to capture changes to the run queue when settling.
+  std::atomic_long epoch = ATOMIC_VAR_INIT(0L);
+
+private:
+  std::list<ProcessBase*> processes;
+  std::mutex mutex;
+
+  // Semaphore used for threads to wait.
+  DecomissionableKernelSemaphore semaphore;
+};
+
+
+#ifdef LOCK_FREE_RUN_QUEUE
+class LockFreeRunQueue
+{
+public:
+  bool extract(ProcessBase*)
+  {
+    // NOTE: moodycamel::ConcurrentQueue does not provide a way to
+    // implement extract so we simply return false here.
+    return false;
+  }
+
+  void wait()
+  {
+    semaphore.wait();
+  }
+
+  void enqueue(ProcessBase* process)
+  {
+    queue.enqueue(process);
+    epoch.fetch_add(1);
+    semaphore.signal();
+  }
+
+  // Precondition: `wait` must get called before `dequeue`!
+  ProcessBase* dequeue()
+  {
+    // NOTE: we loop _forever_ until we actually dequeue a process
+    // because the contract for using the run queue is that `wait`
+    // must be called first so we know that there is something to be
+    // dequeued or the run queue has been decommissioned and we should
+    // just return `nullptr`.
+    ProcessBase* process = nullptr;
+    while (!queue.try_dequeue(process)) {
+      if (semaphore.decomissioned()) {
+        break;
+      }
+    }
+    return process;
+  }
+
+  bool empty() const
+  {
+    return queue.size_approx() == 0;
+  }
+
+  void decomission()
+  {
+    semaphore.decomission();
+  }
+
+  // Epoch used to capture changes to the run queue when settling.
+  std::atomic_long epoch = ATOMIC_VAR_INIT(0L);
+
+private:
+  moodycamel::ConcurrentQueue<ProcessBase*> queue;
+
+  // Semaphore used for threads to wait for the queue.
+  DecomissionableKernelSemaphore semaphore;
+};
+#endif // LOCK_FREE_RUN_QUEUE
+
+} // namespace process {
+
+#endif // __PROCESS_RUN_QUEUE_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/6076dbc2/3rdparty/libprocess/src/semaphore.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/semaphore.hpp b/3rdparty/libprocess/src/semaphore.hpp
new file mode 100644
index 0000000..0143883
--- /dev/null
+++ b/3rdparty/libprocess/src/semaphore.hpp
@@ -0,0 +1,183 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License
+
+#ifndef __PROCESS_SEMAPHORE_HPP__
+#define __PROCESS_SEMAPHORE_HPP__
+
+#ifdef __MACH__
+#include <mach/mach.h>
+#elif __WINDOWS__
+#include <stout/windows.hpp>
+#else
+#include <semaphore.h>
+#endif // __MACH__
+
+#include <stout/check.hpp>
+
+// TODO(benh): Introduce a user-level semaphore that _only_ traps into
+// the kernel if the thread would actually need to wait.
+
+// TODO(benh): Add tests for these!
+
+#ifdef __MACH__
+class KernelSemaphore
+{
+public:
+  KernelSemaphore()
+  {
+    CHECK_EQ(
+        KERN_SUCCESS,
+        semaphore_create(mach_task_self(), &semaphore, SYNC_POLICY_FIFO, 0));
+  }
+
+  KernelSemaphore(const KernelSemaphore& other) = delete;
+
+  ~KernelSemaphore()
+  {
+    CHECK_EQ(KERN_SUCCESS, semaphore_destroy(mach_task_self(), semaphore));
+  }
+
+  KernelSemaphore& operator=(const KernelSemaphore& other) = delete;
+
+  void wait()
+  {
+    CHECK_EQ(KERN_SUCCESS, semaphore_wait(semaphore));
+  }
+
+  void signal()
+  {
+    CHECK_EQ(KERN_SUCCESS, semaphore_signal(semaphore));
+  }
+
+private:
+  semaphore_t semaphore;
+};
+#elif __WINDOWS__
+class KernelSemaphore
+{
+public:
+  KernelSemaphore()
+  {
+    semaphore = CHECK_NOTNULL(CreateSemaphore(nullptr, 0, LONG_MAX, nullptr));
+  }
+
+  KernelSemaphore(const KernelSemaphore& other) = delete;
+
+  ~KernelSemaphore()
+  {
+    CHECK(CloseHandle(semaphore));
+  }
+
+  KernelSemaphore& operator=(const KernelSemaphore& other) = delete;
+
+  void wait()
+  {
+    CHECK_EQ(WAIT_OBJECT_0, WaitForSingleObject(semaphore, INFINITE));
+  }
+
+  void signal()
+  {
+    CHECK(ReleaseSemaphore(semaphore, 1, nullptr));
+  }
+
+private:
+  HANDLE semaphore;
+};
+#else
+class KernelSemaphore
+{
+public:
+  KernelSemaphore()
+  {
+    PCHECK(sem_init(&semaphore, 0, 0) == 0);
+  }
+
+  KernelSemaphore(const KernelSemaphore& other) = delete;
+
+  ~KernelSemaphore()
+  {
+    PCHECK(sem_destroy(&semaphore) == 0);
+  }
+
+  KernelSemaphore& operator=(const KernelSemaphore& other) = delete;
+
+  void wait()
+  {
+    int result = sem_wait(&semaphore);
+
+    while (result != 0 && errno == EINTR) {
+      result = sem_wait(&semaphore);
+    }
+
+    PCHECK(result == 0);
+  }
+
+  void signal()
+  {
+    PCHECK(sem_post(&semaphore) == 0);
+  }
+
+private:
+  sem_t semaphore;
+};
+#endif // __MACH__
+
+
+// Provides a "decomissionable" kernel semaphore which allows us to
+// effectively flush all waiters and keep any future threads from
+// waiting. In order to be able to decomission the semaphore we need
+// to keep around the number of waiters so we can signal them all.
+class DecomissionableKernelSemaphore : public KernelSemaphore
+{
+public:
+  void wait()
+  {
+    // NOTE: we must check `commissioned` AFTER we have incremented
+    // `waiters` otherwise we might race with `decomission()` and fail
+    // to properly get signaled.
+    waiters.fetch_add(1);
+
+    if (!comissioned.load()) {
+      waiters.fetch_sub(1);
+      return;
+    }
+
+    KernelSemaphore::wait();
+
+    waiters.fetch_sub(1);
+  }
+
+  void decomission()
+  {
+    comissioned.store(false);
+
+    // Now signal all the waiters so they wake up and stop
+    // waiting. Note that this may do more `signal()` than necessary
+    // but since no future threads will wait that doesn't matter (it
+    // would only matter if we cared about the value of the semaphore
+    // which in the current implementation we don't).
+    for (size_t i = waiters.load(); i > 0; i--) {
+      signal();
+    }
+  }
+
+  bool decomissioned() const
+  {
+    return !comissioned.load();
+  }
+
+private:
+  std::atomic<bool> comissioned = ATOMIC_VAR_INIT(true);
+  std::atomic<size_t> waiters = ATOMIC_VAR_INIT(0);
+};
+
+#endif // __PROCESS_SEMAPHORE_HPP__


[7/7] mesos git commit: Added double-checked locking for filter.

Posted by be...@apache.org.
Added double-checked locking for filter.

When running in"production" a filter will not be (likely) be set yet
with the current code the threads will experience head-of-line
blocking becaues they'll all queue up on the mutex. Test code will
still queue up but we don't care about the performance in these
situations!

Note that this patch also moves the filter into ProcessManager in the
effort towards eliminating globals.

Review: https://reviews.apache.org/r/60871


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8f42d0c1
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8f42d0c1
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8f42d0c1

Branch: refs/heads/master
Commit: 8f42d0c112748e138cc607b945455045fdac4b20
Parents: 102e869
Author: Benjamin Hindman <be...@gmail.com>
Authored: Fri Jul 14 09:16:03 2017 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Wed Jul 19 13:20:21 2017 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/filter.hpp | 37 ++++++++++
 3rdparty/libprocess/src/process.cpp            | 81 ++++++++++-----------
 2 files changed, 75 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/8f42d0c1/3rdparty/libprocess/include/process/filter.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/filter.hpp b/3rdparty/libprocess/include/process/filter.hpp
index cafa7be..3f4a827 100644
--- a/3rdparty/libprocess/include/process/filter.hpp
+++ b/3rdparty/libprocess/include/process/filter.hpp
@@ -24,6 +24,43 @@ public:
   virtual bool filter(const DispatchEvent&) { return false; }
   virtual bool filter(const HttpEvent&) { return false; }
   virtual bool filter(const ExitedEvent&) { return false; }
+
+  virtual bool filter(Event* event)
+  {
+    bool result = false;
+    struct FilterVisitor : EventVisitor
+    {
+      explicit FilterVisitor(Filter* _filter, bool* _result)
+        : filter(_filter), result(_result) {}
+
+      virtual void visit(const MessageEvent& event)
+      {
+        *result = filter->filter(event);
+      }
+
+      virtual void visit(const DispatchEvent& event)
+      {
+        *result = filter->filter(event);
+      }
+
+      virtual void visit(const HttpEvent& event)
+      {
+        *result = filter->filter(event);
+      }
+
+      virtual void visit(const ExitedEvent& event)
+      {
+        *result = filter->filter(event);
+      }
+
+      Filter* filter;
+      bool* result;
+    } visitor(this, &result);
+
+    event->visit(&visitor);
+
+    return result;
+  }
 };
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/8f42d0c1/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 4ddeba3..b268cda 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -561,6 +561,18 @@ public:
   // The /__processes__ route.
   Future<Response> __processes__(const Request&);
 
+  void install(Filter* f)
+  {
+    // NOTE: even though `filter` is atomic we still need to
+    // synchronize updating it because once we return from this
+    // function the old filter might get deleted which could be bad if
+    // a thread is currently using the old filter in
+    // `ProcessManager::resume`.
+    synchronized (filter_mutex) {
+      filter.store(f);
+    }
+  }
+
 private:
   // Delegate process name to receive root HTTP requests.
   const Option<string> delegate;
@@ -594,6 +606,13 @@ private:
   // Whether the process manager is finalizing or not.
   // If true, no further processes will be spawned.
   std::atomic_bool finalizing;
+
+  // Filter. Synchronized support for using the filter needs to be
+  // recursive in case a filter wants to do anything fancy (which is
+  // possible and likely given that filters will get used for
+  // testing).
+  std::atomic<Filter*> filter = ATOMIC_VAR_INIT(nullptr);
+  std::recursive_mutex filter_mutex;
 };
 
 
@@ -640,12 +659,6 @@ static AuthorizationCallbacks* authorization_callbacks = nullptr;
 // Global route that returns process information.
 static Route* processes_route = nullptr;
 
-// Filter. Synchronized support for using the filterer needs to be
-// recursive in case a filterer wants to do anything fancy (which is
-// possible and likely given that filters will get used for testing).
-static Filter* filterer = nullptr;
-static std::recursive_mutex* filterer_mutex = new std::recursive_mutex();
-
 // Global garbage collector.
 GarbageCollector* gc = nullptr;
 
@@ -3198,39 +3211,23 @@ void ProcessManager::resume(ProcessBase* process)
       CHECK(event != nullptr);
 
       // Determine if we should filter this event.
-      synchronized (filterer_mutex) {
-        if (filterer != nullptr) {
-          bool filter = false;
-          struct FilterVisitor : EventVisitor
-          {
-            explicit FilterVisitor(bool* _filter) : filter(_filter) {}
-
-            virtual void visit(const MessageEvent& event)
-            {
-              *filter = filterer->filter(event);
-            }
-
-            virtual void visit(const DispatchEvent& event)
-            {
-              *filter = filterer->filter(event);
-            }
-
-            virtual void visit(const HttpEvent& event)
-            {
-              *filter = filterer->filter(event);
-            }
-
-            virtual void visit(const ExitedEvent& event)
-            {
-              *filter = filterer->filter(event);
-            }
-
-            bool* filter;
-          } visitor(&filter);
-
-          event->visit(&visitor);
-
-          if (filter) {
+      //
+      // NOTE: we use double-checked locking here to avoid
+      // head-of-line blocking that occurs when the first thread
+      // attempts to filter an event.
+      //
+      // TODO(benh): While not critical for production systems because
+      // the filter should not be set in production systems, we could
+      // use a reader/writer lock here in addition to double-checked
+      // locking.
+      //
+      // TODO(benh): Consider optimizing this further to not be
+      // sequentially consistent. For more details see:
+      // http://preshing.com/20130930/double-checked-locking-is-fixed-in-cpp11.
+      if (filter.load() != nullptr) {
+        synchronized (filter_mutex) {
+          Filter* f = filter.load();
+          if (f != nullptr && f->filter(event)) {
             delete event;
             continue; // Try and execute the next event.
           }
@@ -4180,13 +4177,11 @@ bool wait(const UPID& pid, const Duration& duration)
 }
 
 
-void filter(Filter *filter)
+void filter(Filter* filter)
 {
   process::initialize();
 
-  synchronized (filterer_mutex) {
-    filterer = filter;
-  }
+  process_manager->install(filter);
 }
 
 


[2/7] mesos git commit: Changes for libprocess Message optimization.

Posted by be...@apache.org.
Changes for libprocess Message optimization.

Review: https://reviews.apache.org/r/60884


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/102e8694
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/102e8694
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/102e8694

Branch: refs/heads/master
Commit: 102e86946d1f467cf30da7bd3f0fb89440866456
Parents: 7413a3c
Author: Benjamin Hindman <be...@gmail.com>
Authored: Fri Jul 14 16:40:29 2017 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Wed Jul 19 13:18:40 2017 -0700

----------------------------------------------------------------------
 src/master/master.cpp | 20 ++++++++++----------
 1 file changed, 10 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/102e8694/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index d895154..a902bfc 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1489,9 +1489,9 @@ void Master::visit(const MessageEvent& event)
   // framework and whether the framework has a principal so we use
   // these two temp variables to simplify the condition checks below.
   bool isRegisteredFramework =
-    frameworks.principals.contains(event.message->from);
+    frameworks.principals.contains(event.message.from);
   const Option<string> principal = isRegisteredFramework
-    ? frameworks.principals[event.message->from]
+    ? frameworks.principals[event.message.from]
     : Option<string>::none();
 
   // Increment the "message_received" counter if the message is from
@@ -1508,7 +1508,7 @@ void Master::visit(const MessageEvent& event)
 
   // All messages are filtered when non-leading.
   if (!elected()) {
-    VLOG(1) << "Dropping '" << event.message->name << "' message since "
+    VLOG(1) << "Dropping '" << event.message.name << "' message since "
             << "not elected yet";
     ++metrics->dropped_messages;
     return;
@@ -1522,7 +1522,7 @@ void Master::visit(const MessageEvent& event)
   // the additional queueing delay and the accumulated backlog
   // of messages post-recovery?
   if (!recovered.get().isReady()) {
-    VLOG(1) << "Dropping '" << event.message->name << "' message since "
+    VLOG(1) << "Dropping '" << event.message.name << "' message since "
             << "not recovered yet";
     ++metrics->dropped_messages;
     return;
@@ -1636,8 +1636,8 @@ void Master::_visit(const MessageEvent& event)
   // mapping may be deleted in handling 'UnregisterFrameworkMessage'
   // but its counter still needs to be incremented for this message.
   const Option<string> principal =
-    frameworks.principals.contains(event.message->from)
-      ? frameworks.principals[event.message->from]
+    frameworks.principals.contains(event.message.from)
+      ? frameworks.principals[event.message.from]
       : Option<string>::none();
 
   ProtobufProcess<Master>::visit(event);
@@ -1661,8 +1661,8 @@ void Master::exceededCapacity(
     const Option<string>& principal,
     uint64_t capacity)
 {
-  LOG(WARNING) << "Dropping message " << event.message->name << " from "
-               << event.message->from
+  LOG(WARNING) << "Dropping message " << event.message.name << " from "
+               << event.message.from
                << (principal.isSome() ? "(" + principal.get() + ")" : "")
                << ": capacity(" << capacity << ") exceeded";
 
@@ -1674,9 +1674,9 @@ void Master::exceededCapacity(
   // unrecoverable error and should take action to recover.
   FrameworkErrorMessage message;
   message.set_message(
-      "Message " + event.message->name +
+      "Message " + event.message.name +
       " dropped: capacity(" + stringify(capacity) + ") exceeded");
-  send(event.message->from, message);
+  send(event.message.from, message);
 }
 
 


[6/7] mesos git commit: Replaced std::map with hashmap for ProcessBase::handlers.

Posted by be...@apache.org.
Replaced std::map with hashmap for ProcessBase::handlers.

Review: https://reviews.apache.org/r/60830


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c1b6d978
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c1b6d978
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c1b6d978

Branch: refs/heads/master
Commit: c1b6d978d0193ff34c28b83228f85c3e4a348153
Parents: 0d2a494
Author: Benjamin Hindman <be...@gmail.com>
Authored: Fri Jun 23 00:15:52 2017 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Wed Jul 19 13:18:40 2017 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/process.hpp | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/c1b6d978/3rdparty/libprocess/include/process/process.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/process.hpp b/3rdparty/libprocess/include/process/process.hpp
index 944fcc6..d40179f 100644
--- a/3rdparty/libprocess/include/process/process.hpp
+++ b/3rdparty/libprocess/include/process/process.hpp
@@ -32,6 +32,7 @@
 #include <process/pid.hpp>
 
 #include <stout/duration.hpp>
+#include <stout/hashmap.hpp>
 #include <stout/lambda.hpp>
 #include <stout/option.hpp>
 #include <stout/synchronized.hpp>
@@ -449,8 +450,8 @@ private:
 
   // Handlers for messages and HTTP requests.
   struct {
-    std::map<std::string, MessageHandler> message;
-    std::map<std::string, HttpEndpoint> http;
+    hashmap<std::string, MessageHandler> message;
+    hashmap<std::string, HttpEndpoint> http;
 
     // Used for delivering HTTP requests in the correct order.
     // Initialized lazily to avoid ProcessBase requiring


[3/7] mesos git commit: Added --enable-lock-free-run-queue configuration to Mesos.

Posted by be...@apache.org.
Added --enable-lock-free-run-queue configuration to Mesos.

Review: https://reviews.apache.org/r/60979


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4936a32b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4936a32b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4936a32b

Branch: refs/heads/master
Commit: 4936a32bcf4bbd0b74778d327d047cb8fe17e4f2
Parents: 6076dbc
Author: Benjamin Hindman <be...@gmail.com>
Authored: Tue Jul 18 22:26:46 2017 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Wed Jul 19 13:18:40 2017 -0700

----------------------------------------------------------------------
 configure.ac | 9 +++++++++
 1 file changed, 9 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/4936a32b/configure.ac
----------------------------------------------------------------------
diff --git a/configure.ac b/configure.ac
index 5f2ef52..b2eeeda 100644
--- a/configure.ac
+++ b/configure.ac
@@ -148,6 +148,11 @@ AC_CONFIG_FILES([mpi/mpiexec-mesos], [chmod +x mpi/mpiexec-mesos])
 # Optional features.
 ###############################################################################
 
+AC_ARG_ENABLE([lock_free_run_queue],
+              AS_HELP_STRING([--enable-lock-free-run-queue],
+                             [enables the lock free run queue in libprocess]),
+                             [], [enable_lock_free_run_queue=no])
+
 AC_ARG_ENABLE([hardening],
               AS_HELP_STRING([--disable-hardening],
                              [disables security measures such as stack
@@ -445,6 +450,10 @@ AS_IF([test "x${ac_cv_env_CXXFLAGS_set}" = "x"],
 # Compiler checks.
 ###############################################################################
 
+# Check if we should use the lock free run queue.
+AS_IF([test "x$enable_lock_free_run_queue" = "xyes"],
+      [AC_DEFINE([LOCK_FREE_RUN_QUEUE])])
+
 # Check to see if we should harden or not.
 AM_CONDITIONAL([ENABLE_HARDENING], [test x"$enable_hardening" = "xyes"])
 


[4/7] mesos git commit: Removed extra/unnecessary allocations of Message.

Posted by be...@apache.org.
Removed extra/unnecessary allocations of Message.

Review: https://reviews.apache.org/r/60831


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7413a3c4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7413a3c4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7413a3c4

Branch: refs/heads/master
Commit: 7413a3c43a82287519712f24151126453f0d82f6
Parents: c1b6d97
Author: Benjamin Hindman <be...@gmail.com>
Authored: Tue Jul 11 17:38:01 2017 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Wed Jul 19 13:18:40 2017 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/event.hpp   |  27 ++--
 3rdparty/libprocess/include/process/gmock.hpp   |  44 +++++--
 .../libprocess/include/process/protobuf.hpp     |   8 +-
 3rdparty/libprocess/src/encoder.hpp             |  66 ++++------
 3rdparty/libprocess/src/process.cpp             | 124 +++++++++----------
 3rdparty/libprocess/src/tests/process_tests.cpp |   4 +-
 3rdparty/libprocess/src/tests/test_linkee.cpp   |   2 +-
 7 files changed, 139 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/7413a3c4/3rdparty/libprocess/include/process/event.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/event.hpp b/3rdparty/libprocess/include/process/event.hpp
index 8afe626..a0ec053 100644
--- a/3rdparty/libprocess/include/process/event.hpp
+++ b/3rdparty/libprocess/include/process/event.hpp
@@ -86,30 +86,25 @@ struct Event
 
 struct MessageEvent : Event
 {
-  explicit MessageEvent(Message* _message)
-    : message(_message) {}
+  explicit MessageEvent(Message&& _message)
+    : message(std::move(_message)) {}
 
-  MessageEvent(const MessageEvent& that)
-    : message(that.message == nullptr ? nullptr : new Message(*that.message)) {}
+  MessageEvent(const MessageEvent& that) = default;
+  MessageEvent(MessageEvent&& that) = default;
 
-  virtual ~MessageEvent()
-  {
-    delete message;
-  }
+  // Keep MessageEvent not assignable even though we made it
+  // copyable.
+  // Note that we are violating the "rule of three" here but it helps
+  // keep the fields const.
+  MessageEvent& operator=(const MessageEvent&) = delete;
+  MessageEvent& operator=(MessageEvent&&) = delete;
 
   virtual void visit(EventVisitor* visitor) const
   {
     visitor->visit(*this);
   }
 
-  Message* const message;
-
-private:
-  // Keep MessageEvent not assignable even though we made it
-  // copyable.
-  // Note that we are violating the "rule of three" here but it helps
-  // keep the fields const.
-  MessageEvent& operator=(const MessageEvent&);
+  const Message message;
 };
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/7413a3c4/3rdparty/libprocess/include/process/gmock.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/gmock.hpp b/3rdparty/libprocess/include/process/gmock.hpp
index e9af943..231efcd 100644
--- a/3rdparty/libprocess/include/process/gmock.hpp
+++ b/3rdparty/libprocess/include/process/gmock.hpp
@@ -109,6 +109,32 @@ PromiseArgFieldActionP2<index, Field, process::Promise<T>*> FutureArgField(
 }
 
 
+ACTION_TEMPLATE(PromiseArgNotPointerField,
+                HAS_1_TEMPLATE_PARAMS(int, k),
+                AND_2_VALUE_PARAMS(field, promise))
+{
+  // TODO(benh): Use a shared_ptr for promise to defend against this
+  // action getting invoked more than once (e.g., used via
+  // WillRepeatedly). We won't be able to set it a second time but at
+  // least we won't get a segmentation fault. We could also consider
+  // warning users if they attempted to set it more than once.
+  promise->set(std::get<k>(args).*field);
+  delete promise;
+}
+
+
+template <int index, typename Field, typename T>
+PromiseArgNotPointerFieldActionP2<index, Field, process::Promise<T>*>
+FutureArgNotPointerField(
+    Field field,
+    process::Future<T>* future)
+{
+  process::Promise<T>* promise = new process::Promise<T>();
+  *future = promise->future();
+  return PromiseArgNotPointerField<index>(field, promise);
+}
+
+
 ACTION_P2(PromiseSatisfy, promise, value)
 {
   promise->set(value);
@@ -319,9 +345,9 @@ private:
 MATCHER_P3(MessageMatcher, name, from, to, "")
 {
   const MessageEvent& event = ::std::get<0>(arg);
-  return (testing::Matcher<std::string>(name).Matches(event.message->name) &&
-          testing::Matcher<UPID>(from).Matches(event.message->from) &&
-          testing::Matcher<UPID>(to).Matches(event.message->to));
+  return (testing::Matcher<std::string>(name).Matches(event.message.name) &&
+          testing::Matcher<UPID>(from).Matches(event.message.from) &&
+          testing::Matcher<UPID>(to).Matches(event.message.to));
 }
 
 
@@ -334,11 +360,11 @@ MATCHER_P4(UnionMessageMatcher, message, unionType, from, to, "")
   message_type message;
 
   return (testing::Matcher<std::string>(message.GetTypeName()).Matches(
-              event.message->name) &&
-          message.ParseFromString(event.message->body) &&
+              event.message.name) &&
+          message.ParseFromString(event.message.body) &&
           testing::Matcher<unionType_type>(unionType).Matches(message.type()) &&
-          testing::Matcher<process::UPID>(from).Matches(event.message->from) &&
-          testing::Matcher<process::UPID>(to).Matches(event.message->to));
+          testing::Matcher<process::UPID>(from).Matches(event.message.from) &&
+          testing::Matcher<process::UPID>(to).Matches(event.message.to));
 }
 
 
@@ -440,7 +466,7 @@ Future<Message> FutureMessage(Name name, From from, To to, bool drop = false)
   synchronized (filter->mutex) {
     EXPECT_CALL(filter->mock, filter(testing::A<const MessageEvent&>()))
       .With(MessageMatcher(name, from, to))
-      .WillOnce(testing::DoAll(FutureArgField<0>(
+      .WillOnce(testing::DoAll(FutureArgNotPointerField<0>(
                                    &MessageEvent::message,
                                    &future),
                                testing::Return(drop)))
@@ -462,7 +488,7 @@ Future<process::Message> FutureUnionMessage(
   synchronized (filter->mutex) {
     EXPECT_CALL(filter->mock, filter(testing::A<const MessageEvent&>()))
       .With(UnionMessageMatcher(message, unionType, from, to))
-      .WillOnce(testing::DoAll(FutureArgField<0>(
+      .WillOnce(testing::DoAll(FutureArgNotPointerField<0>(
                                    &MessageEvent::message,
                                    &future),
                                testing::Return(drop)))

http://git-wip-us.apache.org/repos/asf/mesos/blob/7413a3c4/3rdparty/libprocess/include/process/protobuf.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/protobuf.hpp b/3rdparty/libprocess/include/process/protobuf.hpp
index ba6e6d6..2b6b623 100644
--- a/3rdparty/libprocess/include/process/protobuf.hpp
+++ b/3rdparty/libprocess/include/process/protobuf.hpp
@@ -99,10 +99,10 @@ public:
 protected:
   virtual void visit(const process::MessageEvent& event)
   {
-    if (protobufHandlers.count(event.message->name) > 0) {
-      from = event.message->from; // For 'reply'.
-      protobufHandlers[event.message->name](
-          event.message->from, event.message->body);
+    if (protobufHandlers.count(event.message.name) > 0) {
+      from = event.message.from; // For 'reply'.
+      protobufHandlers[event.message.name](
+          event.message.from, event.message.body);
       from = process::UPID();
     } else {
       process::Process<T>::visit(event);

http://git-wip-us.apache.org/repos/asf/mesos/blob/7413a3c4/3rdparty/libprocess/src/encoder.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/encoder.hpp b/3rdparty/libprocess/src/encoder.hpp
index 517ec21..70b5ec4 100644
--- a/3rdparty/libprocess/src/encoder.hpp
+++ b/3rdparty/libprocess/src/encoder.hpp
@@ -101,54 +101,42 @@ private:
 class MessageEncoder : public DataEncoder
 {
 public:
-  MessageEncoder(Message* _message)
-    : DataEncoder(encode(_message)), message(_message) {}
+  MessageEncoder(const Message& message)
+    : DataEncoder(encode(message)) {}
 
-  virtual ~MessageEncoder()
-  {
-    if (message != nullptr) {
-      delete message;
-    }
-  }
-
-  static std::string encode(Message* message)
+  static std::string encode(const Message& message)
   {
     std::ostringstream out;
 
-    if (message != nullptr) {
-      out << "POST ";
-      // Nothing keeps the 'id' component of a PID from being an empty
-      // string which would create a malformed path that has two
-      // '//' unless we check for it explicitly.
-      // TODO(benh): Make the 'id' part of a PID optional so when it's
-      // missing it's clear that we're simply addressing an ip:port.
-      if (message->to.id != "") {
-        out << "/" << message->to.id;
-      }
+    out << "POST ";
+    // Nothing keeps the 'id' component of a PID from being an empty
+    // string which would create a malformed path that has two
+    // '//' unless we check for it explicitly.
+    // TODO(benh): Make the 'id' part of a PID optional so when it's
+    // missing it's clear that we're simply addressing an ip:port.
+    if (message.to.id != "") {
+      out << "/" << message.to.id;
+    }
 
-      out << "/" << message->name << " HTTP/1.1\r\n"
-          << "User-Agent: libprocess/" << message->from << "\r\n"
-          << "Libprocess-From: " << message->from << "\r\n"
-          << "Connection: Keep-Alive\r\n"
-          << "Host: \r\n";
-
-      if (message->body.size() > 0) {
-        out << "Transfer-Encoding: chunked\r\n\r\n"
-            << std::hex << message->body.size() << "\r\n";
-        out.write(message->body.data(), message->body.size());
-        out << "\r\n"
-            << "0\r\n"
-            << "\r\n";
-      } else {
-        out << "\r\n";
-      }
+    out << "/" << message.name << " HTTP/1.1\r\n"
+        << "User-Agent: libprocess/" << message.from << "\r\n"
+        << "Libprocess-From: " << message.from << "\r\n"
+        << "Connection: Keep-Alive\r\n"
+        << "Host: \r\n";
+
+    if (message.body.size() > 0) {
+      out << "Transfer-Encoding: chunked\r\n\r\n"
+          << std::hex << message.body.size() << "\r\n";
+      out.write(message.body.data(), message.body.size());
+      out << "\r\n"
+          << "0\r\n"
+          << "\r\n";
+    } else {
+      out << "\r\n";
     }
 
     return out.str();
   }
-
-private:
-  Message* message;
 };
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/7413a3c4/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 182dd91..4ddeba3 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -426,7 +426,7 @@ public:
   void send(const Response& response,
             const Request& request,
             const Socket& socket);
-  void send(Message* message,
+  void send(Message&& message,
             const SocketImpl::Kind& kind = SocketImpl::DEFAULT_KIND());
 
   Encoder* next(int_fd s);
@@ -467,7 +467,7 @@ private:
   void send_connect(
       const Future<Nothing>& future,
       Socket socket,
-      Message* message);
+      Message&& message);
 
   // Collection of all active sockets (both inbound and outbound).
   hashmap<int_fd, Socket> sockets;
@@ -740,28 +740,27 @@ void Clock::settle()
 }
 
 
-static Message* encode(const UPID& from,
-                       const UPID& to,
-                       const string& name,
-                       const string& data = "")
+static Message encode(
+    const UPID& from,
+    const UPID& to,
+    const string& name,
+    const char* data,
+    size_t length)
 {
-  Message* message = new Message();
-  message->from = from;
-  message->to = to;
-  message->name = name;
-  message->body = data;
+  Message message{name, from, to, string(data, length)};
   return message;
 }
 
 
-static void transport(Message* message, ProcessBase* sender = nullptr)
+static void transport(Message&& message, ProcessBase* sender = nullptr)
 {
-  if (message->to.address == __address__) {
+  if (message.to.address == __address__) {
     // Local message.
-    process_manager->deliver(message->to, new MessageEvent(message), sender);
+    MessageEvent* event = new MessageEvent(std::move(message));
+    process_manager->deliver(event->message.to, event, sender);
   } else {
     // Remote message.
-    socket_manager->send(message);
+    socket_manager->send(std::move(message));
   }
 }
 
@@ -802,7 +801,7 @@ static Future<Owned<Request>> convert(Owned<Request>&& pipeRequest)
 }
 
 
-static Future<Message*> parse(const Request& request)
+static Future<MessageEvent*> parse(const Request& request)
 {
   // TODO(benh): Do better error handling (to deal with a malformed
   // libprocess message, malicious or otherwise).
@@ -856,13 +855,13 @@ static Future<Message*> parse(const Request& request)
 
   return reader.readAll()
     .then([from, name, to](const string& body) {
-      Message* message = new Message();
-      message->name = name;
-      message->from = from.get();
-      message->to = to;
-      message->body = body;
+      Message message;
+      message.name = name;
+      message.from = from.get();
+      message.to = to;
+      message.body = body;
 
-      return message;
+      return new MessageEvent(std::move(message));
     });
 }
 
@@ -2192,12 +2191,12 @@ void SocketManager::send(
 void SocketManager::send_connect(
     const Future<Nothing>& future,
     Socket socket,
-    Message* message)
+    Message&& message)
 {
   if (future.isDiscarded() || future.isFailed()) {
     if (future.isFailed()) {
-      VLOG(1) << "Failed to send '" << message->name << "' to '"
-              << message->to.address << "', connect: " << future.failure();
+      VLOG(1) << "Failed to send '" << message.name << "' to '"
+              << message.to.address << "', connect: " << future.failure();
     }
 
     // Check if SSL is enabled, and whether we allow a downgrade to
@@ -2219,7 +2218,6 @@ void SocketManager::send_connect(
         if (create.isError()) {
           VLOG(1) << "Failed to link, create socket: " << create.error();
           socket_manager->close(socket);
-          delete message;
           return;
         }
 
@@ -2234,13 +2232,13 @@ void SocketManager::send_connect(
       }
 
       CHECK_SOME(poll_socket);
-      poll_socket.get().connect(message->to.address)
+      poll_socket.get().connect(message.to.address)
         .onAny(lambda::bind(
-            &SocketManager::send_connect,
-            this,
-            lambda::_1,
-            poll_socket.get(),
-            message));
+            // TODO(benh): with C++14 we can use lambda instead of
+            // `std::bind` and capture `message` with a `std::move`.
+            [this, poll_socket](Message& message, const Future<Nothing>& f) {
+              send_connect(f, poll_socket.get(), std::move(message));
+            }, std::move(message), lambda::_1));
 
       // We don't need to 'shutdown()' the socket as it was never
       // connected.
@@ -2250,7 +2248,6 @@ void SocketManager::send_connect(
 
     socket_manager->close(socket);
 
-    delete message;
     return;
   }
 
@@ -2274,11 +2271,9 @@ void SocketManager::send_connect(
 }
 
 
-void SocketManager::send(Message* message, const SocketImpl::Kind& kind)
+void SocketManager::send(Message&& message, const SocketImpl::Kind& kind)
 {
-  CHECK(message != nullptr);
-
-  const Address& address = message->to.address;
+  const Address& address = message.to.address;
 
   Option<Socket> socket = None();
   bool connect = false;
@@ -2315,7 +2310,6 @@ void SocketManager::send(Message* message, const SocketImpl::Kind& kind)
       Try<Socket> create = Socket::create(kind);
       if (create.isError()) {
         VLOG(1) << "Failed to send, create socket: " << create.error();
-        delete message;
         return;
       }
       socket = create.get();
@@ -2340,11 +2334,11 @@ void SocketManager::send(Message* message, const SocketImpl::Kind& kind)
     CHECK_SOME(socket);
     socket->connect(address)
       .onAny(lambda::bind(
-          &SocketManager::send_connect,
-          this,
-          lambda::_1,
-          socket.get(),
-          message));
+            // TODO(benh): with C++14 we can use lambda instead of
+            // `std::bind` and capture `message` with a `std::move`.
+            [this, socket](Message& message, const Future<Nothing>& f) {
+              send_connect(f, socket.get(), std::move(message));
+            }, std::move(message), lambda::_1));
   } else {
     // If we're not connecting and we haven't added the encoder to
     // the 'outgoing' queue then schedule it to be sent.
@@ -2897,7 +2891,7 @@ void ProcessManager::handle(
     // from `SocketManager::finalize()` due to it closing all active sockets
     // during libprocess finalization.
     parse(*request)
-      .onAny([this, socket, request](const Future<Message*>& future) {
+      .onAny([this, socket, request](const Future<MessageEvent*>& future) {
         // Get the HttpProxy pid for this socket.
         PID<HttpProxy> proxy = socket_manager->proxy(socket);
 
@@ -2914,7 +2908,7 @@ void ProcessManager::handle(
           return;
         }
 
-        Message* message = CHECK_NOTNULL(future.get());
+        MessageEvent* event = CHECK_NOTNULL(future.get());
 
         // Verify that the UPID this peer is claiming is on the same IP
         // address the peer is sending from.
@@ -2927,10 +2921,10 @@ void ProcessManager::handle(
             network::convert<Address>(request->client.get());
 
           if (client_ip_address.isError() ||
-              message->from.address.ip != client_ip_address->ip) {
+              event->message.from.address.ip != client_ip_address->ip) {
             Response response = BadRequest(
                 "UPID IP address validation failed: Message from " +
-                stringify(message->from) + " was sent from IP " +
+                stringify(event->message.from) + " was sent from IP " +
                 stringify(request->client.get()));
 
             dispatch(proxy, &HttpProxy::enqueue, response, *request);
@@ -2940,14 +2934,14 @@ void ProcessManager::handle(
                     << ": " << response.body;
 
             delete request;
-            delete message;
+            delete event;
             return;
           }
         }
 
         // TODO(benh): Use the sender PID when delivering in order to
         // capture happens-before timing relationships for testing.
-        bool accepted = deliver(message->to, new MessageEvent(message));
+        bool accepted = deliver(event->message.to, event);
 
         // Only send back an HTTP response if this isn't from libprocess
         // (which we determine by looking at the User-Agent). This is
@@ -3676,7 +3670,7 @@ Future<Response> ProcessManager::__processes__(const Request&)
           JSON::Object object;
           object.values["type"] = "MESSAGE";
 
-          const Message& message = *event.message;
+          const Message& message = event.message;
 
           object.values["name"] = message.name;
           object.values["from"] = string(message.from);
@@ -3798,9 +3792,9 @@ void ProcessBase::inject(
   if (!from)
     return;
 
-  Message* message = encode(from, pid, name, string(data, length));
+  Message message = encode(from, pid, name, data, length);
 
-  enqueue(new MessageEvent(message), true);
+  enqueue(new MessageEvent(std::move(message)), true);
 }
 
 
@@ -3815,22 +3809,22 @@ void ProcessBase::send(
   }
 
   // Encode and transport outgoing message.
-  transport(encode(pid, to, name, string(data, length)), this);
+  transport(encode(pid, to, name, data, length), this);
 }
 
 
 void ProcessBase::visit(const MessageEvent& event)
 {
-  if (handlers.message.count(event.message->name) > 0) {
-    handlers.message[event.message->name](
-        event.message->from,
-        event.message->body);
-  } else if (delegates.count(event.message->name) > 0) {
-    VLOG(1) << "Delegating message '" << event.message->name
-            << "' to " << delegates[event.message->name];
-    Message* message = new Message(*event.message);
-    message->to = delegates[event.message->name];
-    transport(message, this);
+  if (handlers.message.count(event.message.name) > 0) {
+    handlers.message[event.message.name](
+        event.message.from,
+        event.message.body);
+  } else if (delegates.count(event.message.name) > 0) {
+    VLOG(1) << "Delegating message '" << event.message.name
+            << "' to " << delegates[event.message.name];
+    Message message(event.message);
+    message.to = delegates[event.message.name];
+    transport(std::move(message), this);
   }
 }
 
@@ -4205,7 +4199,7 @@ void post(const UPID& to, const string& name, const char* data, size_t length)
   }
 
   // Encode and transport outgoing message.
-  transport(encode(UPID(), to, name, string(data, length)));
+  transport(encode(UPID(), to, name, data, length));
 }
 
 
@@ -4222,7 +4216,7 @@ void post(const UPID& from,
   }
 
   // Encode and transport outgoing message.
-  transport(encode(from, to, name, string(data, length)));
+  transport(encode(from, to, name, data, length));
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/7413a3c4/3rdparty/libprocess/src/tests/process_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/process_tests.cpp b/3rdparty/libprocess/src/tests/process_tests.cpp
index c610954..ed11909 100644
--- a/3rdparty/libprocess/src/tests/process_tests.cpp
+++ b/3rdparty/libprocess/src/tests/process_tests.cpp
@@ -741,7 +741,7 @@ protected:
     AWAIT_ASSERT_READY(event);
 
     // Save the PID of the linkee.
-    pid = event->message->from;
+    pid = event->message.from;
 
     terminate(coordinator);
     wait(coordinator);
@@ -1272,7 +1272,7 @@ TEST(ProcessTest, THREADSAFE_Remote)
   message.from = UPID("sender", sender.get());
   message.to = process.self();
 
-  const string data = MessageEncoder::encode(&message);
+  const string data = MessageEncoder::encode(message);
 
   AWAIT_READY(socket.send(data));
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/7413a3c4/3rdparty/libprocess/src/tests/test_linkee.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/test_linkee.cpp b/3rdparty/libprocess/src/tests/test_linkee.cpp
index 29df3c5..cc48271 100644
--- a/3rdparty/libprocess/src/tests/test_linkee.cpp
+++ b/3rdparty/libprocess/src/tests/test_linkee.cpp
@@ -187,7 +187,7 @@ int main(int argc, char** argv)
       message.from = UPID("(1)", address);
       message.to = parent;
 
-      outgoing->send(MessageEncoder::encode(&message));
+      outgoing->send(MessageEncoder::encode(message));
     });
 
   // Now sit and accept links until the linkee is killed.


[5/7] mesos git commit: Replaced std::map with hashmap for ProcessManager::processes.

Posted by be...@apache.org.
Replaced std::map with hashmap for ProcessManager::processes.

Review: https://reviews.apache.org/r/60829


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0d2a4940
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0d2a4940
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0d2a4940

Branch: refs/heads/master
Commit: 0d2a4940bc118c9ace7be5ca25065e42c454130c
Parents: 4936a32
Author: Benjamin Hindman <be...@gmail.com>
Authored: Thu Jun 22 23:49:00 2017 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Wed Jul 19 13:18:40 2017 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/src/process.cpp | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/0d2a4940/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index dff78b0..182dd91 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -566,7 +566,7 @@ private:
   const Option<string> delegate;
 
   // Map of all local spawned and running processes.
-  map<string, ProcessBase*> processes;
+  hashmap<string, ProcessBase*> processes;
   std::recursive_mutex processes_mutex;
 
   // Gates for waiting threads (protected by processes_mutex).
@@ -2848,11 +2848,12 @@ ProcessReference ProcessManager::use(const UPID& pid)
 {
   if (pid.address == __address__) {
     synchronized (processes_mutex) {
-      if (processes.count(pid.id) > 0) {
+      Option<ProcessBase*> process = processes.get(pid.id);
+      if (process.isSome()) {
         // Note that the ProcessReference constructor _must_ get
         // called while holding the lock on processes so that waiting
         // for references is atomic (i.e., race free).
-        return ProcessReference(processes[pid.id]);
+        return ProcessReference(process.get());
       }
     }
   }