You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by jo...@apache.org on 2015/09/11 04:40:01 UTC
[1/4] mesos git commit: mesos: Replace volatile with std::atomic.
Repository: mesos
Updated Branches:
refs/heads/master 81efd727e -> 87de003c6
mesos: Replace volatile with std::atomic.
MESOS-3326.
Review: https://reviews.apache.org/r/37878
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/87de003c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/87de003c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/87de003c
Branch: refs/heads/master
Commit: 87de003c6e8a4dfc2d29ae8b0aab1f83ff0c66a3
Parents: 4b93805
Author: Neil Conway <ne...@gmail.com>
Authored: Thu Sep 10 17:50:33 2015 -0700
Committer: Joris Van Remoortere <jo...@gmail.com>
Committed: Thu Sep 10 19:39:41 2015 -0700
----------------------------------------------------------------------
src/exec/exec.cpp | 32 +++++++++++++++----------------
src/sched/sched.cpp | 50 +++++++++++++++++++++++-------------------------
2 files changed, 40 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/87de003c/src/exec/exec.cpp
----------------------------------------------------------------------
diff --git a/src/exec/exec.cpp b/src/exec/exec.cpp
index 31e0c2f..7b51baa 100644
--- a/src/exec/exec.cpp
+++ b/src/exec/exec.cpp
@@ -20,6 +20,7 @@
#include <sys/types.h>
+#include <atomic>
#include <iostream>
#include <string>
#include <sstream>
@@ -198,7 +199,7 @@ protected:
const SlaveID& slaveId,
const SlaveInfo& slaveInfo)
{
- if (aborted) {
+ if (aborted.load()) {
VLOG(1) << "Ignoring registered message from slave " << slaveId
<< " because the driver is aborted!";
return;
@@ -221,7 +222,7 @@ protected:
void reregistered(const SlaveID& slaveId, const SlaveInfo& slaveInfo)
{
- if (aborted) {
+ if (aborted.load()) {
VLOG(1) << "Ignoring re-registered message from slave " << slaveId
<< " because the driver is aborted!";
return;
@@ -244,7 +245,7 @@ protected:
void reconnect(const UPID& from, const SlaveID& slaveId)
{
- if (aborted) {
+ if (aborted.load()) {
VLOG(1) << "Ignoring reconnect message from slave " << slaveId
<< " because the driver is aborted!";
return;
@@ -280,7 +281,7 @@ protected:
void runTask(const TaskInfo& task)
{
- if (aborted) {
+ if (aborted.load()) {
VLOG(1) << "Ignoring run task message for task " << task.task_id()
<< " because the driver is aborted!";
return;
@@ -305,7 +306,7 @@ protected:
void killTask(const TaskID& taskId)
{
- if (aborted) {
+ if (aborted.load()) {
VLOG(1) << "Ignoring kill task message for task " << taskId
<<" because the driver is aborted!";
return;
@@ -329,7 +330,7 @@ protected:
const TaskID& taskId,
const string& uuid)
{
- if (aborted) {
+ if (aborted.load()) {
VLOG(1) << "Ignoring status update acknowledgement "
<< UUID::fromBytes(uuid) << " for task " << taskId
<< " of framework " << frameworkId
@@ -353,7 +354,7 @@ protected:
const ExecutorID& executorId,
const string& data)
{
- if (aborted) {
+ if (aborted.load()) {
VLOG(1) << "Ignoring framework message because the driver is aborted!";
return;
}
@@ -372,7 +373,7 @@ protected:
void shutdown()
{
- if (aborted) {
+ if (aborted.load()) {
VLOG(1) << "Ignoring shutdown message because the driver is aborted!";
return;
}
@@ -394,7 +395,7 @@ protected:
VLOG(1) << "Executor::shutdown took " << stopwatch.elapsed();
- aborted = true; // To make sure not to accept any new messages.
+ aborted.store(true); // To make sure not to accept any new messages.
if (local) {
terminate(this);
@@ -413,7 +414,7 @@ protected:
void abort()
{
LOG(INFO) << "Deactivating the executor libprocess";
- CHECK(aborted);
+ CHECK(aborted.load());
synchronized (mutex) {
CHECK_NOTNULL(latch)->trigger();
@@ -439,7 +440,7 @@ protected:
virtual void exited(const UPID& pid)
{
- if (aborted) {
+ if (aborted.load()) {
VLOG(1) << "Ignoring exited event because the driver is aborted!";
return;
}
@@ -478,7 +479,7 @@ protected:
VLOG(1) << "Executor::shutdown took " << stopwatch.elapsed();
- aborted = true; // To make sure not to accept any new messages.
+ aborted.store(true); // To make sure not to accept any new messages.
// This is a pretty bad state ... no slave is left. Rather
// than exit lets kill our process group (which includes
@@ -543,7 +544,7 @@ private:
bool connected; // Registered with the slave.
UUID connection; // UUID to identify the connection instance.
bool local;
- volatile bool aborted;
+ std::atomic_bool aborted;
std::recursive_mutex* mutex;
Latch* latch;
const string directory;
@@ -750,12 +751,11 @@ Status MesosExecutorDriver::abort()
CHECK(process != NULL);
- // We set the volatile aborted to true here to prevent any further
+ // We set the atomic aborted to true here to prevent any further
// messages from being processed in the ExecutorProcess. However,
// if abort() is called from another thread as the ExecutorProcess,
// there may be at most one additional message processed.
- // TODO(bmahler): Use an atomic boolean.
- process->aborted = true;
+ process->aborted.store(true);
// Dispatching here ensures that we still process the outstanding
// requests *from* the executor, since those do proceed when
http://git-wip-us.apache.org/repos/asf/mesos/blob/87de003c/src/sched/sched.cpp
----------------------------------------------------------------------
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index 012af05..1fc9e73 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -221,7 +221,7 @@ protected:
void detected(const Future<Option<MasterInfo> >& _master)
{
- if (!running) {
+ if (!running.load()) {
VLOG(1) << "Ignoring the master change because the driver is not"
<< " running!";
return;
@@ -292,7 +292,7 @@ protected:
void authenticate()
{
- if (!running) {
+ if (!running.load()) {
VLOG(1) << "Ignoring authenticate because the driver is not running!";
return;
}
@@ -360,7 +360,7 @@ protected:
void _authenticate()
{
- if (!running) {
+ if (!running.load()) {
VLOG(1) << "Ignoring _authenticate because the driver is not running!";
return;
}
@@ -415,7 +415,7 @@ protected:
void authenticationTimeout(Future<bool> future)
{
- if (!running) {
+ if (!running.load()) {
VLOG(1) << "Ignoring authentication timeout because "
<< "the driver is not running!";
return;
@@ -617,7 +617,7 @@ protected:
const FrameworkID& frameworkId,
const MasterInfo& masterInfo)
{
- if (!running) {
+ if (!running.load()) {
VLOG(1) << "Ignoring framework registered message because "
<< "the driver is not running!";
return;
@@ -659,7 +659,7 @@ protected:
const FrameworkID& frameworkId,
const MasterInfo& masterInfo)
{
- if (!running) {
+ if (!running.load()) {
VLOG(1) << "Ignoring framework re-registered message because "
<< "the driver is not running!";
return;
@@ -698,7 +698,7 @@ protected:
void doReliableRegistration(Duration maxBackoff)
{
- if (!running) {
+ if (!running.load()) {
return;
}
@@ -755,7 +755,7 @@ protected:
const vector<Offer>& offers,
const vector<string>& pids)
{
- if (!running) {
+ if (!running.load()) {
VLOG(1) << "Ignoring resource offers message because "
<< "the driver is not running!";
return;
@@ -805,7 +805,7 @@ protected:
void rescindOffer(const UPID& from, const OfferID& offerId)
{
- if (!running) {
+ if (!running.load()) {
VLOG(1) << "Ignoring rescind offer message because "
<< "the driver is not running!";
return;
@@ -845,7 +845,7 @@ protected:
const StatusUpdate& update,
const UPID& pid)
{
- if (!running) {
+ if (!running.load()) {
VLOG(1) << "Ignoring task status update message because "
<< "the driver is not running!";
return;
@@ -910,10 +910,10 @@ protected:
VLOG(1) << "Scheduler::statusUpdate took " << stopwatch.elapsed();
if (implicitAcknowledgements) {
- // Note that we need to look at the volatile 'running' here
+ // Note that we need to look at the atomic 'running' here
// so that we don't acknowledge the update if the driver was
// aborted during the processing of the update.
- if (!running) {
+ if (!running.load()) {
VLOG(1) << "Not sending status update acknowledgment message because "
<< "the driver is not running!";
return;
@@ -948,7 +948,7 @@ protected:
void lostSlave(const UPID& from, const SlaveID& slaveId)
{
- if (!running) {
+ if (!running.load()) {
VLOG(1) << "Ignoring lost slave message because the driver is not"
<< " running!";
return;
@@ -988,7 +988,7 @@ protected:
const ExecutorID& executorId,
const string& data)
{
- if (!running) {
+ if (!running.load()) {
VLOG(1)
<< "Ignoring framework message because the driver is not running!";
return;
@@ -1008,7 +1008,7 @@ protected:
void error(const string& message)
{
- if (!running) {
+ if (!running.load()) {
VLOG(1) << "Ignoring error message because the driver is not running!";
return;
}
@@ -1061,7 +1061,7 @@ protected:
{
LOG(INFO) << "Aborting framework '" << framework.id() << "'";
- CHECK(!running);
+ CHECK(!running.load());
if (!connected) {
VLOG(1) << "Not sending a deactivate message as master is disconnected";
@@ -1248,11 +1248,11 @@ protected:
return;
}
- // NOTE: By ignoring the volatile 'running' here, we ensure that
- // all acknowledgements requested before the driver was stopped
- // or aborted are processed. Any acknowledgement that is requested
- // after the driver stops or aborts (running == false) will be
- // dropped in the driver before reaching here.
+ // NOTE: By ignoring the atomic 'running' here, we ensure that all
+ // acknowledgements requested before the driver was stopped or
+ // aborted are processed. Any acknowledgement that is requested
+ // after the driver stops or aborts (running.load() == false) will
+ // be dropped in the driver before reaching here.
// Only statuses with a 'uuid' and a 'slave_id' need to have
// acknowledgements sent to the master. Note that the driver
@@ -1427,9 +1427,7 @@ private:
// there may be one additional callback delivered to the scheduler.
// This could happen if the SchedulerProcess is in the middle of
// processing an event.
- // TODO(vinod): Instead of 'volatile' use std::atomic() to guarantee
- // atomicity.
- volatile bool running; // Flag to indicate if the driver is running.
+ std::atomic_bool running; // Flag to indicate if the driver is running.
MasterDetector* detector;
@@ -1757,7 +1755,7 @@ Status MesosSchedulerDriver::stop(bool failover)
// it due to bad parameters (e.g. error in creating the detector
// or loading flags).
if (process != NULL) {
- process->running = false;
+ process->running.store(false);
dispatch(process, &SchedulerProcess::stop, failover);
}
@@ -1788,7 +1786,7 @@ Status MesosSchedulerDriver::abort()
}
CHECK_NOTNULL(process);
- process->running = false;
+ process->running.store(false);
// Dispatching here ensures that we still process the outstanding
// requests *from* the scheduler, since those do proceed when
[3/4] mesos git commit: libprocess: Replace GCC instrinsics and
volatile with std::atomic.
Posted by jo...@apache.org.
libprocess: Replace GCC instrinsics and volatile with std::atomic.
MESOS-3326.
Review: https://reviews.apache.org/r/37877
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4b938052
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4b938052
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4b938052
Branch: refs/heads/master
Commit: 4b938052b6af124eb1fdaec9b597c620627677ea
Parents: 4a01850
Author: Neil Conway <ne...@gmail.com>
Authored: Thu Sep 10 17:50:22 2015 -0700
Committer: Joris Van Remoortere <jo...@gmail.com>
Committed: Thu Sep 10 19:39:41 2015 -0700
----------------------------------------------------------------------
3rdparty/libprocess/include/process/latch.hpp | 4 +-
.../include/process/metrics/counter.hpp | 15 +++---
3rdparty/libprocess/include/process/process.hpp | 2 +-
3rdparty/libprocess/src/clock.cpp | 5 +-
3rdparty/libprocess/src/latch.cpp | 15 +++---
3rdparty/libprocess/src/process.cpp | 52 ++++++++++----------
3rdparty/libprocess/src/process_reference.hpp | 8 +--
7 files changed, 51 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/4b938052/3rdparty/libprocess/include/process/latch.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/latch.hpp b/3rdparty/libprocess/include/process/latch.hpp
index a1a2227..8a9d121 100644
--- a/3rdparty/libprocess/include/process/latch.hpp
+++ b/3rdparty/libprocess/include/process/latch.hpp
@@ -15,6 +15,8 @@
#ifndef __PROCESS_LATCH_HPP__
#define __PROCESS_LATCH_HPP__
+#include <atomic>
+
#include <process/pid.hpp>
#include <stout/duration.hpp>
@@ -43,7 +45,7 @@ private:
Latch(const Latch& that);
Latch& operator=(const Latch& that);
- bool triggered;
+ std::atomic_bool triggered;
UPID pid;
};
http://git-wip-us.apache.org/repos/asf/mesos/blob/4b938052/3rdparty/libprocess/include/process/metrics/counter.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/metrics/counter.hpp b/3rdparty/libprocess/include/process/metrics/counter.hpp
index e51a8be..fd8be32 100644
--- a/3rdparty/libprocess/include/process/metrics/counter.hpp
+++ b/3rdparty/libprocess/include/process/metrics/counter.hpp
@@ -35,19 +35,20 @@ public:
: Metric(name, window),
data(new Data())
{
- push(data->v);
+ push(data->value.load());
}
virtual ~Counter() {}
virtual Future<double> value() const
{
- return static_cast<double>(data->v);
+ return static_cast<double>(data->value.load());
}
void reset()
{
- push(__sync_and_and_fetch(&data->v, 0));
+ data->value.store(0);
+ push(0);
}
Counter& operator++()
@@ -64,17 +65,17 @@ public:
Counter& operator+=(int64_t v)
{
- push(__sync_add_and_fetch(&data->v, v));
+ int64_t prev = data->value.fetch_add(v);
+ push(prev + v);
return *this;
}
private:
struct Data
{
- explicit Data() : v(0) {}
+ explicit Data() : value(0) {}
- // TODO(dhamon): Update to std::atomic<int64_t> when C++11 lands.
- volatile int64_t v;
+ std::atomic<int64_t> value;
};
std::shared_ptr<Data> data;
http://git-wip-us.apache.org/repos/asf/mesos/blob/4b938052/3rdparty/libprocess/include/process/process.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/process.hpp b/3rdparty/libprocess/include/process/process.hpp
index cc8317f..8b086f2 100644
--- a/3rdparty/libprocess/include/process/process.hpp
+++ b/3rdparty/libprocess/include/process/process.hpp
@@ -332,7 +332,7 @@ private:
std::deque<Event*> events;
// Active references.
- int refs;
+ std::atomic_long refs;
// Process PID.
UPID pid;
http://git-wip-us.apache.org/repos/asf/mesos/blob/4b938052/3rdparty/libprocess/src/clock.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/clock.cpp b/3rdparty/libprocess/src/clock.cpp
index 09c60e5..5806098 100644
--- a/3rdparty/libprocess/src/clock.cpp
+++ b/3rdparty/libprocess/src/clock.cpp
@@ -247,14 +247,15 @@ Timer Clock::timer(
const Duration& duration,
const lambda::function<void(void)>& thunk)
{
- static uint64_t id = 1; // Start at 1 since Timer() instances use id 0.
+ // Start at 1 since Timer() instances use id 0.
+ static std::atomic<uint64_t> id(1);
// Assumes Clock::now() does Clock::now(__process__).
Timeout timeout = Timeout::in(duration);
UPID pid = __process__ != NULL ? __process__->self() : UPID();
- Timer timer(__sync_fetch_and_add(&id, 1), timeout, pid, thunk);
+ Timer timer(id.fetch_add(1), timeout, pid, thunk);
VLOG(3) << "Created a timer for " << pid << " in " << stringify(duration)
<< " in the future (" << timeout.time() << ")";
http://git-wip-us.apache.org/repos/asf/mesos/blob/4b938052/3rdparty/libprocess/src/latch.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/latch.cpp b/3rdparty/libprocess/src/latch.cpp
index f7d94d9..f433a05 100644
--- a/3rdparty/libprocess/src/latch.cpp
+++ b/3rdparty/libprocess/src/latch.cpp
@@ -25,10 +25,8 @@ namespace process {
// within libprocess such that it doesn't cost a memory allocation, a
// spawn, a message send, a wait, and two user-space context-switchs.
-Latch::Latch()
+Latch::Latch() : triggered(false)
{
- triggered = false;
-
// Deadlock is possible if one thread is trying to delete a latch
// but the libprocess thread(s) is trying to acquire a resource the
// deleting thread is holding. Hence, we only save the PID for
@@ -40,7 +38,8 @@ Latch::Latch()
Latch::~Latch()
{
- if (__sync_bool_compare_and_swap(&triggered, false, true)) {
+ bool expected = false;
+ if (triggered.compare_exchange_strong(expected, true)) {
terminate(pid);
}
}
@@ -48,8 +47,8 @@ Latch::~Latch()
bool Latch::trigger()
{
- // TODO(benh): Use std::atomic when C++11 rolls out.
- if (__sync_bool_compare_and_swap(&triggered, false, true)) {
+ bool expected = false;
+ if (triggered.compare_exchange_strong(expected, true)) {
terminate(pid);
return true;
}
@@ -59,7 +58,7 @@ bool Latch::trigger()
bool Latch::await(const Duration& duration)
{
- if (!triggered) {
+ if (!triggered.load()) {
process::wait(pid, duration); // Explict to disambiguate.
// It's possible that we failed to wait because:
// (1) Our process has already terminated.
@@ -71,7 +70,7 @@ bool Latch::await(const Duration& duration)
// 'triggered' (which will also capture cases where we actually
// timed out but have since triggered, which seems like an
// acceptable semantics given such a "tie").
- return triggered;
+ return triggered.load();
}
return true;
http://git-wip-us.apache.org/repos/asf/mesos/blob/4b938052/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 0e5394a..4afa305 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -427,7 +427,7 @@ private:
std::recursive_mutex runq_mutex;
// Number of running processes, to support Clock::settle operation.
- int running;
+ std::atomic_long running;
// List of rules applied to all incoming HTTP requests.
vector<Owned<FirewallRule>> firewallRules;
@@ -746,23 +746,26 @@ void install(vector<Owned<FirewallRule>>&& rules)
void initialize(const string& delegate)
{
// TODO(benh): Return an error if attempting to initialize again
- // with a different delegate then originally specified.
+ // with a different delegate than originally specified.
// static pthread_once_t init = PTHREAD_ONCE_INIT;
// pthread_once(&init, ...);
- static volatile bool initialized = false;
- static volatile bool initializing = true;
+ static std::atomic_bool initialized(false);
+ static std::atomic_bool initializing(true);
// Try and do the initialization or wait for it to complete.
- if (initialized && !initializing) {
+ // TODO(neilc): Try to simplify and/or document this logic.
+ if (initialized.load() && !initializing.load()) {
return;
- } else if (initialized && initializing) {
- while (initializing);
+ } else if (initialized.load() && initializing.load()) {
+ while (initializing.load());
return;
} else {
- if (!__sync_bool_compare_and_swap(&initialized, false, true)) {
- while (initializing);
+ // `compare_exchange_strong` needs an lvalue.
+ bool expected = false;
+ if (!initialized.compare_exchange_strong(expected, true)) {
+ while (initializing.load());
return;
}
}
@@ -945,9 +948,9 @@ void initialize(const string& delegate)
PLOG(FATAL) << "Failed to initialize: " << listen.error();
}
- // Need to set initialzing here so that we can actually invoke
- // 'spawn' below for the garbage collector.
- initializing = false;
+ // Need to set `initializing` here so that we can actually invoke `spawn()`
+ // below for the garbage collector.
+ initializing.store(false);
__s__->accept()
.onAny(lambda::bind(&internal::on_accept, lambda::_1));
@@ -998,7 +1001,7 @@ void finalize()
{
delete process_manager;
- // TODO(benh): Finialize/shutdown Clock so that it doesn't attempt
+ // TODO(benh): Finalize/shutdown Clock so that it doesn't attempt
// to dereference 'process_manager' in the 'timedout' callback.
}
@@ -2111,8 +2114,7 @@ void SocketManager::swap_implementing_socket(const Socket& from, Socket* to)
ProcessManager::ProcessManager(const string& _delegate)
: delegate(_delegate)
{
- running = 0;
- __sync_synchronize(); // Ensure write to 'running' visible in other threads.
+ running.store(0);
}
@@ -2485,8 +2487,8 @@ void ProcessManager::resume(ProcessBase* process)
__process__ = NULL;
- CHECK_GE(running, 1);
- __sync_fetch_and_sub(&running, 1);
+ CHECK_GE(running.load(), 1);
+ running.fetch_sub(1);
}
@@ -2525,11 +2527,10 @@ void ProcessManager::cleanup(ProcessBase* process)
// Remove process.
synchronized (processes_mutex) {
// Wait for all process references to get cleaned up.
- while (process->refs > 0) {
+ while (process->refs.load() > 0) {
#if defined(__i386__) || defined(__x86_64__)
asm ("pause");
#endif
- __sync_synchronize();
}
synchronized (process->mutex) {
@@ -2545,7 +2546,7 @@ void ProcessManager::cleanup(ProcessBase* process)
gates.erase(it);
}
- CHECK(process->refs == 0);
+ CHECK(process->refs.load() == 0);
process->state = ProcessBase::TERMINATED;
}
@@ -2672,7 +2673,7 @@ bool ProcessManager::wait(const UPID& pid)
// 'runq' and 'running' equal to 0 between when we exit
// this critical section and increment 'running').
runq.erase(it);
- __sync_fetch_and_add(&running, 1);
+ running.fetch_add(1);
} else {
// Another thread has resumed the process ...
process = NULL;
@@ -2783,7 +2784,7 @@ ProcessBase* ProcessManager::dequeue()
// Increment the running count of processes in order to support
// the Clock::settle() operation (this must be done atomically
// with removing the process from the runq).
- __sync_fetch_and_add(&running, 1);
+ running.fetch_add(1);
}
}
@@ -2803,7 +2804,7 @@ void ProcessManager::settle()
// expect the http::get will have properly enqueued a process on
// the run queue but http::get is just sending bytes on a
// socket. Without sleeping at the beginning of this function we
- // can get unlucky and appear settled when in actuallity the
+ // can get unlucky and appear settled when in actuality the
// kernel just hasn't copied the bytes to a socket or we haven't
// yet read the bytes and enqueued an event on a process (and the
// process on the run queue).
@@ -2817,10 +2818,7 @@ void ProcessManager::settle()
continue;
}
- // Read barrier for 'running'.
- __sync_synchronize();
-
- if (running > 0) {
+ if (running.load() > 0) {
done = false;
continue;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/4b938052/3rdparty/libprocess/src/process_reference.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process_reference.hpp b/3rdparty/libprocess/src/process_reference.hpp
index f8df4a6..e6110bb 100644
--- a/3rdparty/libprocess/src/process_reference.hpp
+++ b/3rdparty/libprocess/src/process_reference.hpp
@@ -66,7 +66,7 @@ private:
: process(_process)
{
if (process != NULL) {
- __sync_fetch_and_add(&(process->refs), 1);
+ process->refs.fetch_add(1);
}
}
@@ -78,15 +78,15 @@ private:
// There should be at least one reference to the process, so
// we don't need to worry about checking if it's exiting or
// not, since we know we can always create another reference.
- CHECK(process->refs > 0);
- __sync_fetch_and_add(&(process->refs), 1);
+ CHECK(process->refs.load() > 0);
+ process->refs.fetch_add(1);
}
}
void cleanup()
{
if (process != NULL) {
- __sync_fetch_and_sub(&(process->refs), 1);
+ process->refs.fetch_sub(1);
}
}
[4/4] mesos git commit: mesos: Update style guide for usage of
std::atomic.
Posted by jo...@apache.org.
mesos: Update style guide for usage of std::atomic.
mesos: Update style guide for usage of std::atomic.
Review: https://reviews.apache.org/r/38265
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/94117491
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/94117491
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/94117491
Branch: refs/heads/master
Commit: 941174914d309477b05865078ecf8a0f69db04f8
Parents: 81efd72
Author: Neil Conway <ne...@gmail.com>
Authored: Thu Sep 10 17:50:04 2015 -0700
Committer: Joris Van Remoortere <jo...@gmail.com>
Committed: Thu Sep 10 19:39:41 2015 -0700
----------------------------------------------------------------------
docs/mesos-c++-style-guide.md | 3 +++
1 file changed, 3 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/94117491/docs/mesos-c++-style-guide.md
----------------------------------------------------------------------
diff --git a/docs/mesos-c++-style-guide.md b/docs/mesos-c++-style-guide.md
index 5e4d13e..f1ed32a 100644
--- a/docs/mesos-c++-style-guide.md
+++ b/docs/mesos-c++-style-guide.md
@@ -259,6 +259,9 @@ Try<Owned<LocalAuthorizer>> authorizer = LocalAuthorizer::create();
* `std::mutex`
* `std::lock_guard<std::mutex>`
* `std::unique_lock<std::mutex>`
+* Atomics (`std::atomic`)
+ * The standard defines a number of predefined typedefs for atomic types (e.g., `std::atomic_int`), in addition to `std::atomic<T>`. When a typedef is available, it should be preferred over explicit template specialization of `std::atomic<T>`.
+ * When reading from and writing to atomic values, the `load` and `store` member functions should be used instead of the overloads of `operator T()` and `operator=`. Being explicit helps to draw the reader's attention to the fact that atomic values are being manipulated.
* Shared from this.
* `class T : public std::enable_shared_from_this<T>`
* `shared_from_this()`
[2/4] mesos git commit: stout: Replace GCC intrinsics with
std::atomic.
Posted by jo...@apache.org.
stout: Replace GCC intrinsics with std::atomic.
MESOS-3326.
Review: https://reviews.apache.org/r/37876
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4a01850c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4a01850c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4a01850c
Branch: refs/heads/master
Commit: 4a01850c554048cfa53ca03d7d6e3cf901ce166d
Parents: 9411749
Author: Neil Conway <ne...@gmail.com>
Authored: Thu Sep 10 17:50:13 2015 -0700
Committer: Joris Van Remoortere <jo...@gmail.com>
Committed: Thu Sep 10 19:39:41 2015 -0700
----------------------------------------------------------------------
.../3rdparty/stout/include/stout/os/posix/fork.hpp | 16 +++++++---------
1 file changed, 7 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/4a01850c/3rdparty/libprocess/3rdparty/stout/include/stout/os/posix/fork.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/stout/include/stout/os/posix/fork.hpp b/3rdparty/libprocess/3rdparty/stout/include/stout/os/posix/fork.hpp
index d43433a..7eb51e8 100644
--- a/3rdparty/libprocess/3rdparty/stout/include/stout/os/posix/fork.hpp
+++ b/3rdparty/libprocess/3rdparty/stout/include/stout/os/posix/fork.hpp
@@ -21,6 +21,7 @@
#include <sys/types.h>
#include <sys/wait.h>
+#include <atomic>
#include <list>
#include <memory>
#include <set>
@@ -232,7 +233,7 @@ private:
pid_t group;
pid_t session;
- bool set; // Has this been initialized?
+ std::atomic_bool set; // Has this been initialized?
};
std::shared_ptr<Memory> memory;
@@ -274,11 +275,11 @@ private:
// Constructs a Tree (see above) from this fork template.
Try<Tree> prepare() const
{
- static int forks = 0;
+ static std::atomic_int forks(0);
// Each "instance" of an instantiated Fork needs a unique name for
// creating shared memory.
- int instance = __sync_fetch_and_add(&forks, 1);
+ int instance = forks.fetch_add(1);
std::string name =
"/stout-forks-" + stringify(getpid()) + stringify(instance);
@@ -312,7 +313,7 @@ private:
Tree tree;
tree.memory = std::shared_ptr<Tree::Memory>((Tree::Memory*)memory, deleter);
- tree.memory->set = false;
+ tree.memory->set.store(false);
for (size_t i = 0; i < children.size(); i++) {
Try<Tree> tree_ = children[i].prepare();
@@ -340,7 +341,7 @@ private:
process.parent = getppid();
process.group = getpgid(0);
process.session = getsid(0);
- process.set = true;
+ process.set.store(true);
// Copy it into shared memory.
memcpy(tree.memory.get(), &process, sizeof(Tree::Memory));
@@ -381,10 +382,7 @@ private:
{
// Wait for the forked process.
// TODO(benh): Don't wait forever?
- while (!tree.memory->set) {
- // Make sure we don't keep reading the value from a register.
- __sync_synchronize();
- }
+ while (!tree.memory->set.load());
// All processes in the returned ProcessTree will have the
// command-line of the top level process, since we construct the