You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by jo...@apache.org on 2015/09/11 04:40:01 UTC

[1/4] mesos git commit: mesos: Replace volatile with std::atomic.

Repository: mesos
Updated Branches:
  refs/heads/master 81efd727e -> 87de003c6


mesos: Replace volatile with std::atomic.

MESOS-3326.

Review: https://reviews.apache.org/r/37878


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/87de003c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/87de003c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/87de003c

Branch: refs/heads/master
Commit: 87de003c6e8a4dfc2d29ae8b0aab1f83ff0c66a3
Parents: 4b93805
Author: Neil Conway <ne...@gmail.com>
Authored: Thu Sep 10 17:50:33 2015 -0700
Committer: Joris Van Remoortere <jo...@gmail.com>
Committed: Thu Sep 10 19:39:41 2015 -0700

----------------------------------------------------------------------
 src/exec/exec.cpp   | 32 +++++++++++++++----------------
 src/sched/sched.cpp | 50 +++++++++++++++++++++++-------------------------
 2 files changed, 40 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/87de003c/src/exec/exec.cpp
----------------------------------------------------------------------
diff --git a/src/exec/exec.cpp b/src/exec/exec.cpp
index 31e0c2f..7b51baa 100644
--- a/src/exec/exec.cpp
+++ b/src/exec/exec.cpp
@@ -20,6 +20,7 @@
 
 #include <sys/types.h>
 
+#include <atomic>
 #include <iostream>
 #include <string>
 #include <sstream>
@@ -198,7 +199,7 @@ protected:
                   const SlaveID& slaveId,
                   const SlaveInfo& slaveInfo)
   {
-    if (aborted) {
+    if (aborted.load()) {
       VLOG(1) << "Ignoring registered message from slave " << slaveId
               << " because the driver is aborted!";
       return;
@@ -221,7 +222,7 @@ protected:
 
   void reregistered(const SlaveID& slaveId, const SlaveInfo& slaveInfo)
   {
-    if (aborted) {
+    if (aborted.load()) {
       VLOG(1) << "Ignoring re-registered message from slave " << slaveId
               << " because the driver is aborted!";
       return;
@@ -244,7 +245,7 @@ protected:
 
   void reconnect(const UPID& from, const SlaveID& slaveId)
   {
-    if (aborted) {
+    if (aborted.load()) {
       VLOG(1) << "Ignoring reconnect message from slave " << slaveId
               << " because the driver is aborted!";
       return;
@@ -280,7 +281,7 @@ protected:
 
   void runTask(const TaskInfo& task)
   {
-    if (aborted) {
+    if (aborted.load()) {
       VLOG(1) << "Ignoring run task message for task " << task.task_id()
               << " because the driver is aborted!";
       return;
@@ -305,7 +306,7 @@ protected:
 
   void killTask(const TaskID& taskId)
   {
-    if (aborted) {
+    if (aborted.load()) {
       VLOG(1) << "Ignoring kill task message for task " << taskId
               <<" because the driver is aborted!";
       return;
@@ -329,7 +330,7 @@ protected:
       const TaskID& taskId,
       const string& uuid)
   {
-    if (aborted) {
+    if (aborted.load()) {
       VLOG(1) << "Ignoring status update acknowledgement "
               << UUID::fromBytes(uuid) << " for task " << taskId
               << " of framework " << frameworkId
@@ -353,7 +354,7 @@ protected:
                         const ExecutorID& executorId,
                         const string& data)
   {
-    if (aborted) {
+    if (aborted.load()) {
       VLOG(1) << "Ignoring framework message because the driver is aborted!";
       return;
     }
@@ -372,7 +373,7 @@ protected:
 
   void shutdown()
   {
-    if (aborted) {
+    if (aborted.load()) {
       VLOG(1) << "Ignoring shutdown message because the driver is aborted!";
       return;
     }
@@ -394,7 +395,7 @@ protected:
 
     VLOG(1) << "Executor::shutdown took " << stopwatch.elapsed();
 
-    aborted = true; // To make sure not to accept any new messages.
+    aborted.store(true); // To make sure not to accept any new messages.
 
     if (local) {
       terminate(this);
@@ -413,7 +414,7 @@ protected:
   void abort()
   {
     LOG(INFO) << "Deactivating the executor libprocess";
-    CHECK(aborted);
+    CHECK(aborted.load());
 
     synchronized (mutex) {
       CHECK_NOTNULL(latch)->trigger();
@@ -439,7 +440,7 @@ protected:
 
   virtual void exited(const UPID& pid)
   {
-    if (aborted) {
+    if (aborted.load()) {
       VLOG(1) << "Ignoring exited event because the driver is aborted!";
       return;
     }
@@ -478,7 +479,7 @@ protected:
 
     VLOG(1) << "Executor::shutdown took " << stopwatch.elapsed();
 
-    aborted = true; // To make sure not to accept any new messages.
+    aborted.store(true); // To make sure not to accept any new messages.
 
     // This is a pretty bad state ... no slave is left. Rather
     // than exit lets kill our process group (which includes
@@ -543,7 +544,7 @@ private:
   bool connected; // Registered with the slave.
   UUID connection; // UUID to identify the connection instance.
   bool local;
-  volatile bool aborted;
+  std::atomic_bool aborted;
   std::recursive_mutex* mutex;
   Latch* latch;
   const string directory;
@@ -750,12 +751,11 @@ Status MesosExecutorDriver::abort()
 
     CHECK(process != NULL);
 
-    // We set the volatile aborted to true here to prevent any further
+    // We set the atomic aborted to true here to prevent any further
     // messages from being processed in the ExecutorProcess. However,
     // if abort() is called from another thread as the ExecutorProcess,
     // there may be at most one additional message processed.
-    // TODO(bmahler): Use an atomic boolean.
-    process->aborted = true;
+    process->aborted.store(true);
 
     // Dispatching here ensures that we still process the outstanding
     // requests *from* the executor, since those do proceed when

http://git-wip-us.apache.org/repos/asf/mesos/blob/87de003c/src/sched/sched.cpp
----------------------------------------------------------------------
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index 012af05..1fc9e73 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -221,7 +221,7 @@ protected:
 
   void detected(const Future<Option<MasterInfo> >& _master)
   {
-    if (!running) {
+    if (!running.load()) {
       VLOG(1) << "Ignoring the master change because the driver is not"
               << " running!";
       return;
@@ -292,7 +292,7 @@ protected:
 
   void authenticate()
   {
-    if (!running) {
+    if (!running.load()) {
       VLOG(1) << "Ignoring authenticate because the driver is not running!";
       return;
     }
@@ -360,7 +360,7 @@ protected:
 
   void _authenticate()
   {
-    if (!running) {
+    if (!running.load()) {
       VLOG(1) << "Ignoring _authenticate because the driver is not running!";
       return;
     }
@@ -415,7 +415,7 @@ protected:
 
   void authenticationTimeout(Future<bool> future)
   {
-    if (!running) {
+    if (!running.load()) {
       VLOG(1) << "Ignoring authentication timeout because "
               << "the driver is not running!";
       return;
@@ -617,7 +617,7 @@ protected:
       const FrameworkID& frameworkId,
       const MasterInfo& masterInfo)
   {
-    if (!running) {
+    if (!running.load()) {
       VLOG(1) << "Ignoring framework registered message because "
               << "the driver is not running!";
       return;
@@ -659,7 +659,7 @@ protected:
       const FrameworkID& frameworkId,
       const MasterInfo& masterInfo)
   {
-    if (!running) {
+    if (!running.load()) {
       VLOG(1) << "Ignoring framework re-registered message because "
               << "the driver is not running!";
       return;
@@ -698,7 +698,7 @@ protected:
 
   void doReliableRegistration(Duration maxBackoff)
   {
-    if (!running) {
+    if (!running.load()) {
       return;
     }
 
@@ -755,7 +755,7 @@ protected:
       const vector<Offer>& offers,
       const vector<string>& pids)
   {
-    if (!running) {
+    if (!running.load()) {
       VLOG(1) << "Ignoring resource offers message because "
               << "the driver is not running!";
       return;
@@ -805,7 +805,7 @@ protected:
 
   void rescindOffer(const UPID& from, const OfferID& offerId)
   {
-    if (!running) {
+    if (!running.load()) {
       VLOG(1) << "Ignoring rescind offer message because "
               << "the driver is not running!";
       return;
@@ -845,7 +845,7 @@ protected:
       const StatusUpdate& update,
       const UPID& pid)
   {
-    if (!running) {
+    if (!running.load()) {
       VLOG(1) << "Ignoring task status update message because "
               << "the driver is not running!";
       return;
@@ -910,10 +910,10 @@ protected:
     VLOG(1) << "Scheduler::statusUpdate took " << stopwatch.elapsed();
 
     if (implicitAcknowledgements) {
-      // Note that we need to look at the volatile 'running' here
+      // Note that we need to look at the atomic 'running' here
       // so that we don't acknowledge the update if the driver was
       // aborted during the processing of the update.
-      if (!running) {
+      if (!running.load()) {
         VLOG(1) << "Not sending status update acknowledgment message because "
                 << "the driver is not running!";
         return;
@@ -948,7 +948,7 @@ protected:
 
   void lostSlave(const UPID& from, const SlaveID& slaveId)
   {
-    if (!running) {
+    if (!running.load()) {
       VLOG(1) << "Ignoring lost slave message because the driver is not"
               << " running!";
       return;
@@ -988,7 +988,7 @@ protected:
       const ExecutorID& executorId,
       const string& data)
   {
-    if (!running) {
+    if (!running.load()) {
       VLOG(1)
         << "Ignoring framework message because the driver is not running!";
       return;
@@ -1008,7 +1008,7 @@ protected:
 
   void error(const string& message)
   {
-    if (!running) {
+    if (!running.load()) {
       VLOG(1) << "Ignoring error message because the driver is not running!";
       return;
     }
@@ -1061,7 +1061,7 @@ protected:
   {
     LOG(INFO) << "Aborting framework '" << framework.id() << "'";
 
-    CHECK(!running);
+    CHECK(!running.load());
 
     if (!connected) {
       VLOG(1) << "Not sending a deactivate message as master is disconnected";
@@ -1248,11 +1248,11 @@ protected:
       return;
     }
 
-    // NOTE: By ignoring the volatile 'running' here, we ensure that
-    // all acknowledgements requested before the driver was stopped
-    // or aborted are processed. Any acknowledgement that is requested
-    // after the driver stops or aborts (running == false) will be
-    // dropped in the driver before reaching here.
+    // NOTE: By ignoring the atomic 'running' here, we ensure that all
+    // acknowledgements requested before the driver was stopped or
+    // aborted are processed. Any acknowledgement that is requested
+    // after the driver stops or aborts (running.load() == false) will
+    // be dropped in the driver before reaching here.
 
     // Only statuses with a 'uuid' and a 'slave_id' need to have
     // acknowledgements sent to the master. Note that the driver
@@ -1427,9 +1427,7 @@ private:
   // there may be one additional callback delivered to the scheduler.
   // This could happen if the SchedulerProcess is in the middle of
   // processing an event.
-  // TODO(vinod): Instead of 'volatile' use std::atomic() to guarantee
-  // atomicity.
-  volatile bool running; // Flag to indicate if the driver is running.
+  std::atomic_bool running; // Flag to indicate if the driver is running.
 
   MasterDetector* detector;
 
@@ -1757,7 +1755,7 @@ Status MesosSchedulerDriver::stop(bool failover)
     // it due to bad parameters (e.g. error in creating the detector
     // or loading flags).
     if (process != NULL) {
-      process->running =  false;
+      process->running.store(false);
       dispatch(process, &SchedulerProcess::stop, failover);
     }
 
@@ -1788,7 +1786,7 @@ Status MesosSchedulerDriver::abort()
     }
 
     CHECK_NOTNULL(process);
-    process->running = false;
+    process->running.store(false);
 
     // Dispatching here ensures that we still process the outstanding
     // requests *from* the scheduler, since those do proceed when


[3/4] mesos git commit: libprocess: Replace GCC instrinsics and volatile with std::atomic.

Posted by jo...@apache.org.
libprocess: Replace GCC instrinsics and volatile with std::atomic.

MESOS-3326.

Review: https://reviews.apache.org/r/37877


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4b938052
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4b938052
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4b938052

Branch: refs/heads/master
Commit: 4b938052b6af124eb1fdaec9b597c620627677ea
Parents: 4a01850
Author: Neil Conway <ne...@gmail.com>
Authored: Thu Sep 10 17:50:22 2015 -0700
Committer: Joris Van Remoortere <jo...@gmail.com>
Committed: Thu Sep 10 19:39:41 2015 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/latch.hpp   |  4 +-
 .../include/process/metrics/counter.hpp         | 15 +++---
 3rdparty/libprocess/include/process/process.hpp |  2 +-
 3rdparty/libprocess/src/clock.cpp               |  5 +-
 3rdparty/libprocess/src/latch.cpp               | 15 +++---
 3rdparty/libprocess/src/process.cpp             | 52 ++++++++++----------
 3rdparty/libprocess/src/process_reference.hpp   |  8 +--
 7 files changed, 51 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/4b938052/3rdparty/libprocess/include/process/latch.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/latch.hpp b/3rdparty/libprocess/include/process/latch.hpp
index a1a2227..8a9d121 100644
--- a/3rdparty/libprocess/include/process/latch.hpp
+++ b/3rdparty/libprocess/include/process/latch.hpp
@@ -15,6 +15,8 @@
 #ifndef __PROCESS_LATCH_HPP__
 #define __PROCESS_LATCH_HPP__
 
+#include <atomic>
+
 #include <process/pid.hpp>
 
 #include <stout/duration.hpp>
@@ -43,7 +45,7 @@ private:
   Latch(const Latch& that);
   Latch& operator=(const Latch& that);
 
-  bool triggered;
+  std::atomic_bool triggered;
   UPID pid;
 };
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/4b938052/3rdparty/libprocess/include/process/metrics/counter.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/metrics/counter.hpp b/3rdparty/libprocess/include/process/metrics/counter.hpp
index e51a8be..fd8be32 100644
--- a/3rdparty/libprocess/include/process/metrics/counter.hpp
+++ b/3rdparty/libprocess/include/process/metrics/counter.hpp
@@ -35,19 +35,20 @@ public:
     : Metric(name, window),
       data(new Data())
   {
-    push(data->v);
+    push(data->value.load());
   }
 
   virtual ~Counter() {}
 
   virtual Future<double> value() const
   {
-    return static_cast<double>(data->v);
+    return static_cast<double>(data->value.load());
   }
 
   void reset()
   {
-    push(__sync_and_and_fetch(&data->v, 0));
+    data->value.store(0);
+    push(0);
   }
 
   Counter& operator++()
@@ -64,17 +65,17 @@ public:
 
   Counter& operator+=(int64_t v)
   {
-    push(__sync_add_and_fetch(&data->v, v));
+    int64_t prev = data->value.fetch_add(v);
+    push(prev + v);
     return *this;
   }
 
 private:
   struct Data
   {
-    explicit Data() : v(0) {}
+    explicit Data() : value(0) {}
 
-    // TODO(dhamon): Update to std::atomic<int64_t> when C++11 lands.
-    volatile int64_t v;
+    std::atomic<int64_t> value;
   };
 
   std::shared_ptr<Data> data;

http://git-wip-us.apache.org/repos/asf/mesos/blob/4b938052/3rdparty/libprocess/include/process/process.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/process.hpp b/3rdparty/libprocess/include/process/process.hpp
index cc8317f..8b086f2 100644
--- a/3rdparty/libprocess/include/process/process.hpp
+++ b/3rdparty/libprocess/include/process/process.hpp
@@ -332,7 +332,7 @@ private:
   std::deque<Event*> events;
 
   // Active references.
-  int refs;
+  std::atomic_long refs;
 
   // Process PID.
   UPID pid;

http://git-wip-us.apache.org/repos/asf/mesos/blob/4b938052/3rdparty/libprocess/src/clock.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/clock.cpp b/3rdparty/libprocess/src/clock.cpp
index 09c60e5..5806098 100644
--- a/3rdparty/libprocess/src/clock.cpp
+++ b/3rdparty/libprocess/src/clock.cpp
@@ -247,14 +247,15 @@ Timer Clock::timer(
     const Duration& duration,
     const lambda::function<void(void)>& thunk)
 {
-  static uint64_t id = 1; // Start at 1 since Timer() instances use id 0.
+  // Start at 1 since Timer() instances use id 0.
+  static std::atomic<uint64_t> id(1);
 
   // Assumes Clock::now() does Clock::now(__process__).
   Timeout timeout = Timeout::in(duration);
 
   UPID pid = __process__ != NULL ? __process__->self() : UPID();
 
-  Timer timer(__sync_fetch_and_add(&id, 1), timeout, pid, thunk);
+  Timer timer(id.fetch_add(1), timeout, pid, thunk);
 
   VLOG(3) << "Created a timer for " << pid << " in " << stringify(duration)
           << " in the future (" << timeout.time() << ")";

http://git-wip-us.apache.org/repos/asf/mesos/blob/4b938052/3rdparty/libprocess/src/latch.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/latch.cpp b/3rdparty/libprocess/src/latch.cpp
index f7d94d9..f433a05 100644
--- a/3rdparty/libprocess/src/latch.cpp
+++ b/3rdparty/libprocess/src/latch.cpp
@@ -25,10 +25,8 @@ namespace process {
 // within libprocess such that it doesn't cost a memory allocation, a
 // spawn, a message send, a wait, and two user-space context-switchs.
 
-Latch::Latch()
+Latch::Latch() : triggered(false)
 {
-  triggered = false;
-
   // Deadlock is possible if one thread is trying to delete a latch
   // but the libprocess thread(s) is trying to acquire a resource the
   // deleting thread is holding. Hence, we only save the PID for
@@ -40,7 +38,8 @@ Latch::Latch()
 
 Latch::~Latch()
 {
-  if (__sync_bool_compare_and_swap(&triggered, false, true)) {
+  bool expected = false;
+  if (triggered.compare_exchange_strong(expected, true)) {
     terminate(pid);
   }
 }
@@ -48,8 +47,8 @@ Latch::~Latch()
 
 bool Latch::trigger()
 {
-  // TODO(benh): Use std::atomic when C++11 rolls out.
-  if (__sync_bool_compare_and_swap(&triggered, false, true)) {
+  bool expected = false;
+  if (triggered.compare_exchange_strong(expected, true)) {
     terminate(pid);
     return true;
   }
@@ -59,7 +58,7 @@ bool Latch::trigger()
 
 bool Latch::await(const Duration& duration)
 {
-  if (!triggered) {
+  if (!triggered.load()) {
     process::wait(pid, duration); // Explict to disambiguate.
     // It's possible that we failed to wait because:
     //   (1) Our process has already terminated.
@@ -71,7 +70,7 @@ bool Latch::await(const Duration& duration)
     // 'triggered' (which will also capture cases where we actually
     // timed out but have since triggered, which seems like an
     // acceptable semantics given such a "tie").
-    return triggered;
+    return triggered.load();
   }
 
   return true;

http://git-wip-us.apache.org/repos/asf/mesos/blob/4b938052/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 0e5394a..4afa305 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -427,7 +427,7 @@ private:
   std::recursive_mutex runq_mutex;
 
   // Number of running processes, to support Clock::settle operation.
-  int running;
+  std::atomic_long running;
 
   // List of rules applied to all incoming HTTP requests.
   vector<Owned<FirewallRule>> firewallRules;
@@ -746,23 +746,26 @@ void install(vector<Owned<FirewallRule>>&& rules)
 void initialize(const string& delegate)
 {
   // TODO(benh): Return an error if attempting to initialize again
-  // with a different delegate then originally specified.
+  // with a different delegate than originally specified.
 
   // static pthread_once_t init = PTHREAD_ONCE_INIT;
   // pthread_once(&init, ...);
 
-  static volatile bool initialized = false;
-  static volatile bool initializing = true;
+  static std::atomic_bool initialized(false);
+  static std::atomic_bool initializing(true);
 
   // Try and do the initialization or wait for it to complete.
-  if (initialized && !initializing) {
+  // TODO(neilc): Try to simplify and/or document this logic.
+  if (initialized.load() && !initializing.load()) {
     return;
-  } else if (initialized && initializing) {
-    while (initializing);
+  } else if (initialized.load() && initializing.load()) {
+    while (initializing.load());
     return;
   } else {
-    if (!__sync_bool_compare_and_swap(&initialized, false, true)) {
-      while (initializing);
+    // `compare_exchange_strong` needs an lvalue.
+    bool expected = false;
+    if (!initialized.compare_exchange_strong(expected, true)) {
+      while (initializing.load());
       return;
     }
   }
@@ -945,9 +948,9 @@ void initialize(const string& delegate)
     PLOG(FATAL) << "Failed to initialize: " << listen.error();
   }
 
-  // Need to set initialzing here so that we can actually invoke
-  // 'spawn' below for the garbage collector.
-  initializing = false;
+  // Need to set `initializing` here so that we can actually invoke `spawn()`
+  // below for the garbage collector.
+  initializing.store(false);
 
   __s__->accept()
     .onAny(lambda::bind(&internal::on_accept, lambda::_1));
@@ -998,7 +1001,7 @@ void finalize()
 {
   delete process_manager;
 
-  // TODO(benh): Finialize/shutdown Clock so that it doesn't attempt
+  // TODO(benh): Finalize/shutdown Clock so that it doesn't attempt
   // to dereference 'process_manager' in the 'timedout' callback.
 }
 
@@ -2111,8 +2114,7 @@ void SocketManager::swap_implementing_socket(const Socket& from, Socket* to)
 ProcessManager::ProcessManager(const string& _delegate)
   : delegate(_delegate)
 {
-  running = 0;
-  __sync_synchronize(); // Ensure write to 'running' visible in other threads.
+  running.store(0);
 }
 
 
@@ -2485,8 +2487,8 @@ void ProcessManager::resume(ProcessBase* process)
 
   __process__ = NULL;
 
-  CHECK_GE(running, 1);
-  __sync_fetch_and_sub(&running, 1);
+  CHECK_GE(running.load(), 1);
+  running.fetch_sub(1);
 }
 
 
@@ -2525,11 +2527,10 @@ void ProcessManager::cleanup(ProcessBase* process)
   // Remove process.
   synchronized (processes_mutex) {
     // Wait for all process references to get cleaned up.
-    while (process->refs > 0) {
+    while (process->refs.load() > 0) {
 #if defined(__i386__) || defined(__x86_64__)
       asm ("pause");
 #endif
-      __sync_synchronize();
     }
 
     synchronized (process->mutex) {
@@ -2545,7 +2546,7 @@ void ProcessManager::cleanup(ProcessBase* process)
         gates.erase(it);
       }
 
-      CHECK(process->refs == 0);
+      CHECK(process->refs.load() == 0);
       process->state = ProcessBase::TERMINATED;
     }
 
@@ -2672,7 +2673,7 @@ bool ProcessManager::wait(const UPID& pid)
             // 'runq' and 'running' equal to 0 between when we exit
             // this critical section and increment 'running').
             runq.erase(it);
-            __sync_fetch_and_add(&running, 1);
+            running.fetch_add(1);
           } else {
             // Another thread has resumed the process ...
             process = NULL;
@@ -2783,7 +2784,7 @@ ProcessBase* ProcessManager::dequeue()
       // Increment the running count of processes in order to support
       // the Clock::settle() operation (this must be done atomically
       // with removing the process from the runq).
-      __sync_fetch_and_add(&running, 1);
+      running.fetch_add(1);
     }
   }
 
@@ -2803,7 +2804,7 @@ void ProcessManager::settle()
     // expect the http::get will have properly enqueued a process on
     // the run queue but http::get is just sending bytes on a
     // socket. Without sleeping at the beginning of this function we
-    // can get unlucky and appear settled when in actuallity the
+    // can get unlucky and appear settled when in actuality the
     // kernel just hasn't copied the bytes to a socket or we haven't
     // yet read the bytes and enqueued an event on a process (and the
     // process on the run queue).
@@ -2817,10 +2818,7 @@ void ProcessManager::settle()
         continue;
       }
 
-      // Read barrier for 'running'.
-      __sync_synchronize();
-
-      if (running > 0) {
+      if (running.load() > 0) {
         done = false;
         continue;
       }

http://git-wip-us.apache.org/repos/asf/mesos/blob/4b938052/3rdparty/libprocess/src/process_reference.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process_reference.hpp b/3rdparty/libprocess/src/process_reference.hpp
index f8df4a6..e6110bb 100644
--- a/3rdparty/libprocess/src/process_reference.hpp
+++ b/3rdparty/libprocess/src/process_reference.hpp
@@ -66,7 +66,7 @@ private:
     : process(_process)
   {
     if (process != NULL) {
-      __sync_fetch_and_add(&(process->refs), 1);
+      process->refs.fetch_add(1);
     }
   }
 
@@ -78,15 +78,15 @@ private:
       // There should be at least one reference to the process, so
       // we don't need to worry about checking if it's exiting or
       // not, since we know we can always create another reference.
-      CHECK(process->refs > 0);
-      __sync_fetch_and_add(&(process->refs), 1);
+      CHECK(process->refs.load() > 0);
+      process->refs.fetch_add(1);
     }
   }
 
   void cleanup()
   {
     if (process != NULL) {
-      __sync_fetch_and_sub(&(process->refs), 1);
+      process->refs.fetch_sub(1);
     }
   }
 


[4/4] mesos git commit: mesos: Update style guide for usage of std::atomic.

Posted by jo...@apache.org.
mesos: Update style guide for usage of std::atomic.

mesos: Update style guide for usage of std::atomic.

Review: https://reviews.apache.org/r/38265


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/94117491
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/94117491
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/94117491

Branch: refs/heads/master
Commit: 941174914d309477b05865078ecf8a0f69db04f8
Parents: 81efd72
Author: Neil Conway <ne...@gmail.com>
Authored: Thu Sep 10 17:50:04 2015 -0700
Committer: Joris Van Remoortere <jo...@gmail.com>
Committed: Thu Sep 10 19:39:41 2015 -0700

----------------------------------------------------------------------
 docs/mesos-c++-style-guide.md | 3 +++
 1 file changed, 3 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/94117491/docs/mesos-c++-style-guide.md
----------------------------------------------------------------------
diff --git a/docs/mesos-c++-style-guide.md b/docs/mesos-c++-style-guide.md
index 5e4d13e..f1ed32a 100644
--- a/docs/mesos-c++-style-guide.md
+++ b/docs/mesos-c++-style-guide.md
@@ -259,6 +259,9 @@ Try<Owned<LocalAuthorizer>> authorizer = LocalAuthorizer::create();
   * `std::mutex`
   * `std::lock_guard<std::mutex>`
   * `std::unique_lock<std::mutex>`
+* Atomics (`std::atomic`)
+  * The standard defines a number of predefined typedefs for atomic types (e.g., `std::atomic_int`), in addition to `std::atomic<T>`. When a typedef is available, it should be preferred over explicit template specialization of `std::atomic<T>`.
+  * When reading from and writing to atomic values, the `load` and `store` member functions should be used instead of the overloads of `operator T()` and `operator=`. Being explicit helps to draw the reader's attention to the fact that atomic values are being manipulated.
 * Shared from this.
   * `class T : public std::enable_shared_from_this<T>`
   * `shared_from_this()`


[2/4] mesos git commit: stout: Replace GCC intrinsics with std::atomic.

Posted by jo...@apache.org.
stout: Replace GCC intrinsics with std::atomic.

MESOS-3326.

Review: https://reviews.apache.org/r/37876


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4a01850c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4a01850c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4a01850c

Branch: refs/heads/master
Commit: 4a01850c554048cfa53ca03d7d6e3cf901ce166d
Parents: 9411749
Author: Neil Conway <ne...@gmail.com>
Authored: Thu Sep 10 17:50:13 2015 -0700
Committer: Joris Van Remoortere <jo...@gmail.com>
Committed: Thu Sep 10 19:39:41 2015 -0700

----------------------------------------------------------------------
 .../3rdparty/stout/include/stout/os/posix/fork.hpp  | 16 +++++++---------
 1 file changed, 7 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/4a01850c/3rdparty/libprocess/3rdparty/stout/include/stout/os/posix/fork.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/stout/include/stout/os/posix/fork.hpp b/3rdparty/libprocess/3rdparty/stout/include/stout/os/posix/fork.hpp
index d43433a..7eb51e8 100644
--- a/3rdparty/libprocess/3rdparty/stout/include/stout/os/posix/fork.hpp
+++ b/3rdparty/libprocess/3rdparty/stout/include/stout/os/posix/fork.hpp
@@ -21,6 +21,7 @@
 #include <sys/types.h>
 #include <sys/wait.h>
 
+#include <atomic>
 #include <list>
 #include <memory>
 #include <set>
@@ -232,7 +233,7 @@ private:
       pid_t group;
       pid_t session;
 
-      bool set; // Has this been initialized?
+      std::atomic_bool set; // Has this been initialized?
     };
 
     std::shared_ptr<Memory> memory;
@@ -274,11 +275,11 @@ private:
   // Constructs a Tree (see above) from this fork template.
   Try<Tree> prepare() const
   {
-    static int forks = 0;
+    static std::atomic_int forks(0);
 
     // Each "instance" of an instantiated Fork needs a unique name for
     // creating shared memory.
-    int instance = __sync_fetch_and_add(&forks, 1);
+    int instance = forks.fetch_add(1);
 
     std::string name =
       "/stout-forks-" + stringify(getpid()) + stringify(instance);
@@ -312,7 +313,7 @@ private:
 
     Tree tree;
     tree.memory = std::shared_ptr<Tree::Memory>((Tree::Memory*)memory, deleter);
-    tree.memory->set = false;
+    tree.memory->set.store(false);
 
     for (size_t i = 0; i < children.size(); i++) {
       Try<Tree> tree_ = children[i].prepare();
@@ -340,7 +341,7 @@ private:
     process.parent = getppid();
     process.group = getpgid(0);
     process.session = getsid(0);
-    process.set = true;
+    process.set.store(true);
 
     // Copy it into shared memory.
     memcpy(tree.memory.get(), &process, sizeof(Tree::Memory));
@@ -381,10 +382,7 @@ private:
   {
     // Wait for the forked process.
     // TODO(benh): Don't wait forever?
-    while (!tree.memory->set) {
-      // Make sure we don't keep reading the value from a register.
-      __sync_synchronize();
-    }
+    while (!tree.memory->set.load());
 
     // All processes in the returned ProcessTree will have the
     // command-line of the top level process, since we construct the