You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2015/06/14 11:46:19 UTC
[01/21] mesos git commit: stout: Introduce CHECK_NONE and CHECK_ERROR.
Repository: mesos
Updated Branches:
refs/heads/master 763364e86 -> 26527296f
stout: Introduce CHECK_NONE and CHECK_ERROR.
Review: https://reviews.apache.org/r/35422
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/5f7c5a7d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/5f7c5a7d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/5f7c5a7d
Branch: refs/heads/master
Commit: 5f7c5a7d95aa4065e85accf13149b4eb3d798e8c
Parents: 93b60ab
Author: Michael Park <mc...@gmail.com>
Authored: Sat Jun 13 05:24:24 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Jun 14 02:42:59 2015 -0700
----------------------------------------------------------------------
.../3rdparty/stout/include/stout/check.hpp | 115 ++++++++++++++++---
1 file changed, 96 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/5f7c5a7d/3rdparty/libprocess/3rdparty/stout/include/stout/check.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/stout/include/stout/check.hpp b/3rdparty/libprocess/3rdparty/stout/include/stout/check.hpp
index 3972383..b550bfe 100644
--- a/3rdparty/libprocess/3rdparty/stout/include/stout/check.hpp
+++ b/3rdparty/libprocess/3rdparty/stout/include/stout/check.hpp
@@ -21,52 +21,129 @@
#include <glog/logging.h>
#include <stout/abort.hpp>
+#include <stout/error.hpp>
#include <stout/none.hpp>
#include <stout/option.hpp>
#include <stout/result.hpp>
#include <stout/some.hpp>
#include <stout/try.hpp>
-// Provides a CHECK_SOME macro, akin to CHECK.
+// A generic macro to faciliate definitions of CHECK_*, akin to CHECK.
// This appends the error if possible to the end of the log message,
// so there's no need to append the error message explicitly.
-#define CHECK_SOME(expression) \
- for (const Option<std::string> _error = _check(expression); \
- _error.isSome();) \
- _CheckFatal(__FILE__, __LINE__, "CHECK_SOME", \
- #expression, _error.get()).stream()
+// To define a new CHECK_*, provide the name, the function that performs the
+// check, and the expression. See below for examples (e.g. CHECK_SOME).
+#define CHECK_STATE(name, check, expression) \
+ for (const Option<Error> _error = check(expression); _error.isSome();) \
+ _CheckFatal(__FILE__, \
+ __LINE__, \
+ #name, \
+ #expression, \
+ _error.get()).stream()
+
+
+#define CHECK_SOME(expression) \
+ CHECK_STATE(CHECK_SOME, _check_some, expression)
+
+
+#define CHECK_NONE(expression) \
+ CHECK_STATE(CHECK_NONE, _check_none, expression)
+
+
+#define CHECK_ERROR(expression) \
+ CHECK_STATE(CHECK_ERROR, _check_error, expression)
+
+
+// Private structs/functions used for CHECK_*.
-// Private structs/functions used for CHECK_SOME.
template <typename T>
-Option<std::string> _check(const Option<T>& o)
+Option<Error> _check_some(const Option<T>& o)
{
if (o.isNone()) {
- return Some("is NONE");
+ return Error("is NONE");
+ } else {
+ CHECK(o.isSome());
+ return None();
}
- return None();
}
template <typename T>
-Option<std::string> _check(const Try<T>& t)
+Option<Error> _check_some(const Try<T>& t)
{
if (t.isError()) {
- return t.error();
+ return Error(t.error());
+ } else {
+ CHECK(t.isSome());
+ return None();
}
- return None();
}
template <typename T>
-Option<std::string> _check(const Result<T>& r)
+Option<Error> _check_some(const Result<T>& r)
{
if (r.isError()) {
- return r.error();
+ return Error(r.error());
} else if (r.isNone()) {
- return Some("is NONE");
+ return Error("is NONE");
+ } else {
+ CHECK(r.isSome());
+ return None();
+ }
+}
+
+
+template <typename T>
+Option<Error> _check_none(const Option<T>& o)
+{
+ if (o.isSome()) {
+ return Error("is SOME");
+ } else {
+ CHECK(o.isNone());
+ return None();
+ }
+}
+
+
+template <typename T>
+Option<Error> _check_none(const Result<T>& r)
+{
+ if (r.isError()) {
+ return Error("is ERROR");
+ } else if (r.isSome()) {
+ return Error("is SOME");
+ } else {
+ CHECK(r.isNone());
+ return None();
+ }
+}
+
+
+template <typename T>
+Option<Error> _check_error(const Try<T>& t)
+{
+ if (t.isSome()) {
+ return Error("is SOME");
+ } else {
+ CHECK(t.isError());
+ return None();
+ }
+}
+
+
+template <typename T>
+Option<Error> _check_error(const Result<T>& r)
+{
+ if (r.isNone()) {
+ return Error("is NONE");
+ } else if (r.isSome()) {
+ return Error("is SOME");
+ } else {
+ CHECK(r.isError());
+ return None();
}
- return None();
}
@@ -76,11 +153,11 @@ struct _CheckFatal
int _line,
const char* type,
const char* expression,
- const std::string& error)
+ const Error& error)
: file(_file),
line(_line)
{
- out << type << "(" << expression << "): " << error << " ";
+ out << type << "(" << expression << "): " << error.message << " ";
}
~_CheckFatal()
[09/21] mesos git commit: Replace std::lock_guard with synchronized
in fetcher_cache_test.
Posted by be...@apache.org.
Replace std::lock_guard with synchronized in fetcher_cache_test.
Review: https://reviews.apache.org/r/35089
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9e7f64a8
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9e7f64a8
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9e7f64a8
Branch: refs/heads/master
Commit: 9e7f64a8c5c2d12fb6ca5366aec2ebc826ad8fa1
Parents: eb8f88d
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Jun 13 05:55:56 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Jun 14 02:43:00 2015 -0700
----------------------------------------------------------------------
src/tests/fetcher_cache_tests.cpp | 20 +++++++++-----------
1 file changed, 9 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/9e7f64a8/src/tests/fetcher_cache_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fetcher_cache_tests.cpp b/src/tests/fetcher_cache_tests.cpp
index d6d768d..8bd5dd8 100644
--- a/src/tests/fetcher_cache_tests.cpp
+++ b/src/tests/fetcher_cache_tests.cpp
@@ -44,8 +44,6 @@
#include <stout/os.hpp>
#include <stout/try.hpp>
-#include "common/lock.hpp"
-
#include "master/flags.hpp"
#include "master/master.hpp"
@@ -767,19 +765,19 @@ public:
// this function via a Queue. This is currently non-trivial
// because we can't copy an HttpEvent so we're _forced_ to block
// the thread synchronously.
- std::lock_guard<std::mutex> lock(mutex);
+ synchronized (mutex) {
+ countRequests++;
- countRequests++;
+ if (strings::contains(event.request->path, COMMAND_NAME)) {
+ countCommandRequests++;
+ }
- if (strings::contains(event.request->path, COMMAND_NAME)) {
- countCommandRequests++;
- }
+ if (strings::contains(event.request->path, ARCHIVE_NAME)) {
+ countArchiveRequests++;
+ }
- if (strings::contains(event.request->path, ARCHIVE_NAME)) {
- countArchiveRequests++;
+ ProcessBase::visit(event);
}
-
- ProcessBase::visit(event);
}
void resetCounts()
[16/21] mesos git commit: Update ZooKeeper tests to use synchronized.
Posted by be...@apache.org.
Update ZooKeeper tests to use synchronized.
Review: https://reviews.apache.org/r/35101
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f4aaa143
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f4aaa143
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f4aaa143
Branch: refs/heads/master
Commit: f4aaa14392298ca5cbde260560ee32e73f9f2328
Parents: 5a69aa7
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Jun 13 07:20:47 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Jun 14 02:43:01 2015 -0700
----------------------------------------------------------------------
src/hook/manager.cpp | 6 ------
src/tests/zookeeper.cpp | 25 +++++++++++++------------
src/tests/zookeeper.hpp | 1 -
3 files changed, 13 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/f4aaa143/src/hook/manager.cpp
----------------------------------------------------------------------
diff --git a/src/hook/manager.cpp b/src/hook/manager.cpp
index 5236035..b43b918 100644
--- a/src/hook/manager.cpp
+++ b/src/hook/manager.cpp
@@ -121,8 +121,6 @@ Labels HookManager::masterLaunchTaskLabelDecorator(
return taskInfo_.labels();
}
-
- UNREACHABLE();
}
@@ -150,8 +148,6 @@ Labels HookManager::slaveRunTaskLabelDecorator(
return taskInfo_.labels();
}
-
- UNREACHABLE();
}
@@ -176,8 +172,6 @@ Environment HookManager::slaveExecutorEnvironmentDecorator(
return executorInfo.command().environment();
}
-
- UNREACHABLE();
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/f4aaa143/src/tests/zookeeper.cpp
----------------------------------------------------------------------
diff --git a/src/tests/zookeeper.cpp b/src/tests/zookeeper.cpp
index 08cab86..5012017 100644
--- a/src/tests/zookeeper.cpp
+++ b/src/tests/zookeeper.cpp
@@ -33,8 +33,7 @@
#include <stout/lambda.hpp>
#include <stout/path.hpp>
#include <stout/os.hpp>
-
-#include "common/lock.hpp"
+#include <stout/synchronized.hpp>
#include "logging/logging.hpp"
@@ -121,9 +120,10 @@ void ZooKeeperTest::TestWatcher::process(
int64_t sessionId,
const string& path)
{
- Lock lock(&mutex);
- events.push(Event(type, state, path));
- pthread_cond_signal(&cond);
+ synchronized (mutex) {
+ events.push(Event(type, state, path));
+ pthread_cond_signal(&cond);
+ }
}
@@ -158,14 +158,15 @@ void ZooKeeperTest::TestWatcher::awaitCreated(const string& path)
ZooKeeperTest::TestWatcher::Event
ZooKeeperTest::TestWatcher::awaitEvent()
{
- Lock lock(&mutex);
- while (true) {
- while (events.empty()) {
- pthread_cond_wait(&cond, &mutex);
+ synchronized (mutex) {
+ while (true) {
+ while (events.empty()) {
+ pthread_cond_wait(&cond, &mutex);
+ }
+ Event event = events.front();
+ events.pop();
+ return event;
}
- Event event = events.front();
- events.pop();
- return event;
}
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/f4aaa143/src/tests/zookeeper.hpp
----------------------------------------------------------------------
diff --git a/src/tests/zookeeper.hpp b/src/tests/zookeeper.hpp
index d8f1cb3..32fc83b 100644
--- a/src/tests/zookeeper.hpp
+++ b/src/tests/zookeeper.hpp
@@ -19,7 +19,6 @@
#ifndef __TESTS_ZOOKEEPER_HPP__
#define __TESTS_ZOOKEEPER_HPP__
-#include <pthread.h>
#include <stdint.h>
#include <gtest/gtest.h>
[15/21] mesos git commit: Update linux fs to use synchronized.
Posted by be...@apache.org.
Update linux fs to use synchronized.
Review: https://reviews.apache.org/r/35095
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/5b0eeb0b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/5b0eeb0b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/5b0eeb0b
Branch: refs/heads/master
Commit: 5b0eeb0b761343e61de5593fba062ee3229dad1a
Parents: eb33a57
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Jun 13 07:05:33 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Jun 14 02:43:01 2015 -0700
----------------------------------------------------------------------
src/linux/fs.cpp | 14 +++++---------
1 file changed, 5 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/5b0eeb0b/src/linux/fs.cpp
----------------------------------------------------------------------
diff --git a/src/linux/fs.cpp b/src/linux/fs.cpp
index 1c9cf3f..568565f 100644
--- a/src/linux/fs.cpp
+++ b/src/linux/fs.cpp
@@ -26,12 +26,11 @@
#include <stout/numify.hpp>
#include <stout/path.hpp>
#include <stout/strings.hpp>
+#include <stout/synchronized.hpp>
#include <stout/os/read.hpp>
#include <stout/os/stat.hpp>
-#include "common/lock.hpp"
-
#include "linux/fs.hpp"
using std::string;
@@ -188,10 +187,9 @@ Try<MountTable> MountTable::read(const string& path)
// Mutex for guarding calls into non-reentrant mount table
// functions. We use a static local variable to avoid unused
// variable warnings.
- static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
+ static std::mutex mutex;
- {
- Lock lock(&mutex);
+ synchronized (mutex) {
struct mntent* mntent = ::getmntent(file);
if (mntent == NULL) {
// NULL means the end of enties.
@@ -219,14 +217,12 @@ Try<FileSystemTable> FileSystemTable::read()
{
// Mutex for guarding calls into non-reentrant fstab functions. We
// use a static local variable to avoid unused variable warnings.
- static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
+ static std::mutex mutex;
FileSystemTable table;
// Use locks since fstab functions are not thread-safe.
- {
- Lock lock(&mutex);
-
+ synchronized (mutex) {
// Open file _PATH_FSTAB (/etc/fstab).
if (::setfsent() == 0) {
return Error("Failed to open file system table");
[13/21] mesos git commit: Update Mesos scheduler to use synchronized.
Posted by be...@apache.org.
Update Mesos scheduler to use synchronized.
Review: https://reviews.apache.org/r/35096
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b2d80474
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b2d80474
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b2d80474
Branch: refs/heads/master
Commit: b2d8047428228cbbea65f4af889d11e8918e2e96
Parents: 5b0eeb0
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Jun 13 07:06:22 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Jun 14 02:43:01 2015 -0700
----------------------------------------------------------------------
include/mesos/scheduler.hpp | 5 +-
src/sched/sched.cpp | 382 +++++++++++++++++++--------------------
2 files changed, 193 insertions(+), 194 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/b2d80474/include/mesos/scheduler.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/scheduler.hpp b/include/mesos/scheduler.hpp
index 2ee6b5c..0b54ffe 100644
--- a/include/mesos/scheduler.hpp
+++ b/include/mesos/scheduler.hpp
@@ -19,11 +19,10 @@
#ifndef __MESOS_SCHEDULER_HPP__
#define __MESOS_SCHEDULER_HPP__
-#include <functional>
-#include <queue>
-
#include <pthread.h>
+#include <functional>
+#include <queue>
#include <string>
#include <vector>
http://git-wip-us.apache.org/repos/asf/mesos/blob/b2d80474/src/sched/sched.cpp
----------------------------------------------------------------------
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index 9423607..bc76c71 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -71,8 +71,6 @@
#include "authentication/cram_md5/authenticatee.hpp"
-#include "common/lock.hpp"
-
#include "local/flags.hpp"
#include "local/local.hpp"
@@ -841,8 +839,9 @@ protected:
send(master.get(), message);
}
- Lock lock(mutex);
- pthread_cond_signal(cond);
+ synchronized (mutex) {
+ pthread_cond_signal(cond);
+ }
}
// NOTE: This function informs the master to stop attempting to send
@@ -866,8 +865,9 @@ protected:
send(master.get(), message);
}
- Lock lock(mutex);
- pthread_cond_signal(cond);
+ synchronized (mutex) {
+ pthread_cond_signal(cond);
+ }
}
void killTask(const TaskID& taskId)
@@ -1507,156 +1507,156 @@ MesosSchedulerDriver::~MesosSchedulerDriver()
Status MesosSchedulerDriver::start()
{
- Lock lock(&mutex);
-
- if (status != DRIVER_NOT_STARTED) {
- return status;
- }
-
- if (detector == NULL) {
- Try<MasterDetector*> detector_ = MasterDetector::create(url);
-
- if (detector_.isError()) {
- status = DRIVER_ABORTED;
- string message = "Failed to create a master detector for '" +
- master + "': " + detector_.error();
- scheduler->error(this, message);
+ synchronized (mutex) {
+ if (status != DRIVER_NOT_STARTED) {
return status;
}
- // Save the detector so we can delete it later.
- detector = detector_.get();
- }
+ if (detector == NULL) {
+ Try<MasterDetector*> detector_ = MasterDetector::create(url);
- // Load scheduler flags.
- internal::scheduler::Flags flags;
- Try<Nothing> load = flags.load("MESOS_");
+ if (detector_.isError()) {
+ status = DRIVER_ABORTED;
+ string message = "Failed to create a master detector for '" +
+ master + "': " + detector_.error();
+ scheduler->error(this, message);
+ return status;
+ }
- if (load.isError()) {
- status = DRIVER_ABORTED;
- scheduler->error(this, load.error());
- return status;
- }
+ // Save the detector so we can delete it later.
+ detector = detector_.get();
+ }
- // Initialize modules. Note that since other subsystems may depend
- // upon modules, we should initialize modules before anything else.
- if (flags.modules.isSome()) {
- Try<Nothing> result = modules::ModuleManager::load(flags.modules.get());
- if (result.isError()) {
+ // Load scheduler flags.
+ internal::scheduler::Flags flags;
+ Try<Nothing> load = flags.load("MESOS_");
+
+ if (load.isError()) {
status = DRIVER_ABORTED;
- scheduler->error(this, "Error loading modules: " + result.error());
+ scheduler->error(this, load.error());
return status;
}
- }
- CHECK(process == NULL);
+ // Initialize modules. Note that since other subsystems may depend
+ // upon modules, we should initialize modules before anything else.
+ if (flags.modules.isSome()) {
+ Try<Nothing> result = modules::ModuleManager::load(flags.modules.get());
+ if (result.isError()) {
+ status = DRIVER_ABORTED;
+ scheduler->error(this, "Error loading modules: " + result.error());
+ return status;
+ }
+ }
- if (credential == NULL) {
- process = new SchedulerProcess(
- this,
- scheduler,
- framework,
- None(),
- implicitAcknowlegements,
- schedulerId,
- detector,
- flags,
- &mutex,
- &cond);
- } else {
- const Credential& cred = *credential;
- process = new SchedulerProcess(
- this,
- scheduler,
- framework,
- cred,
- implicitAcknowlegements,
- schedulerId,
- detector,
- flags,
- &mutex,
- &cond);
+ CHECK(process == NULL);
+
+ if (credential == NULL) {
+ process = new SchedulerProcess(
+ this,
+ scheduler,
+ framework,
+ None(),
+ implicitAcknowlegements,
+ schedulerId,
+ detector,
+ flags,
+ &mutex,
+ &cond);
+ } else {
+ const Credential& cred = *credential;
+ process = new SchedulerProcess(
+ this,
+ scheduler,
+ framework,
+ cred,
+ implicitAcknowlegements,
+ schedulerId,
+ detector,
+ flags,
+ &mutex,
+ &cond);
+ }
+
+ spawn(process);
+
+ return status = DRIVER_RUNNING;
}
-
- spawn(process);
-
- return status = DRIVER_RUNNING;
}
Status MesosSchedulerDriver::stop(bool failover)
{
- Lock lock(&mutex);
-
- LOG(INFO) << "Asked to stop the driver";
+ synchronized (mutex) {
+ LOG(INFO) << "Asked to stop the driver";
- if (status != DRIVER_RUNNING && status != DRIVER_ABORTED) {
- VLOG(1) << "Ignoring stop because the status of the driver is "
- << Status_Name(status);
- return status;
- }
+ if (status != DRIVER_RUNNING && status != DRIVER_ABORTED) {
+ VLOG(1) << "Ignoring stop because the status of the driver is "
+ << Status_Name(status);
+ return status;
+ }
- // 'process' might be NULL if the driver has failed to instantiate
- // it due to bad parameters (e.g. error in creating the detector
- // or loading flags).
- if (process != NULL) {
- process->running = false;
- dispatch(process, &SchedulerProcess::stop, failover);
- }
+ // 'process' might be NULL if the driver has failed to instantiate
+ // it due to bad parameters (e.g. error in creating the detector
+ // or loading flags).
+ if (process != NULL) {
+ process->running = false;
+ dispatch(process, &SchedulerProcess::stop, failover);
+ }
- // TODO(benh): It might make more sense to clean up our local
- // cluster here than in the destructor. However, what would be even
- // better is to allow multiple local clusters to exist (i.e. not use
- // global vars in local.cpp) so that ours can just be an instance
- // variable in MesosSchedulerDriver.
+ // TODO(benh): It might make more sense to clean up our local
+ // cluster here than in the destructor. However, what would be
+ // even better is to allow multiple local clusters to exist (i.e.
+ // not use global vars in local.cpp) so that ours can just be an
+ // instance variable in MesosSchedulerDriver.
- bool aborted = status == DRIVER_ABORTED;
+ bool aborted = status == DRIVER_ABORTED;
- status = DRIVER_STOPPED;
+ status = DRIVER_STOPPED;
- return aborted ? DRIVER_ABORTED : status;
+ return aborted ? DRIVER_ABORTED : status;
+ }
}
Status MesosSchedulerDriver::abort()
{
- Lock lock(&mutex);
-
- LOG(INFO) << "Asked to abort the driver";
+ synchronized (mutex) {
+ LOG(INFO) << "Asked to abort the driver";
- if (status != DRIVER_RUNNING) {
- VLOG(1) << "Ignoring abort because the status of the driver is "
- << Status_Name(status);
- return status;
- }
+ if (status != DRIVER_RUNNING) {
+ VLOG(1) << "Ignoring abort because the status of the driver is "
+ << Status_Name(status);
+ return status;
+ }
- CHECK_NOTNULL(process);
- process->running = false;
+ CHECK_NOTNULL(process);
+ process->running = false;
- // Dispatching here ensures that we still process the outstanding
- // requests *from* the scheduler, since those do proceed when
- // aborted is true.
- dispatch(process, &SchedulerProcess::abort);
+ // Dispatching here ensures that we still process the outstanding
+ // requests *from* the scheduler, since those do proceed when
+ // aborted is true.
+ dispatch(process, &SchedulerProcess::abort);
- return status = DRIVER_ABORTED;
+ return status = DRIVER_ABORTED;
+ }
}
Status MesosSchedulerDriver::join()
{
- Lock lock(&mutex);
-
- if (status != DRIVER_RUNNING) {
- return status;
- }
+ synchronized (mutex) {
+ if (status != DRIVER_RUNNING) {
+ return status;
+ }
- while (status == DRIVER_RUNNING) {
- pthread_cond_wait(&cond, &mutex);
- }
+ while (status == DRIVER_RUNNING) {
+ pthread_cond_wait(&cond, &mutex);
+ }
- CHECK(status == DRIVER_ABORTED || status == DRIVER_STOPPED);
+ CHECK(status == DRIVER_ABORTED || status == DRIVER_STOPPED);
- return status;
+ return status;
+ }
}
@@ -1669,17 +1669,17 @@ Status MesosSchedulerDriver::run()
Status MesosSchedulerDriver::killTask(const TaskID& taskId)
{
- Lock lock(&mutex);
-
- if (status != DRIVER_RUNNING) {
- return status;
- }
+ synchronized (mutex) {
+ if (status != DRIVER_RUNNING) {
+ return status;
+ }
- CHECK(process != NULL);
+ CHECK(process != NULL);
- dispatch(process, &SchedulerProcess::killTask, taskId);
+ dispatch(process, &SchedulerProcess::killTask, taskId);
- return status;
+ return status;
+ }
}
@@ -1700,17 +1700,17 @@ Status MesosSchedulerDriver::launchTasks(
const vector<TaskInfo>& tasks,
const Filters& filters)
{
- Lock lock(&mutex);
-
- if (status != DRIVER_RUNNING) {
- return status;
- }
+ synchronized (mutex) {
+ if (status != DRIVER_RUNNING) {
+ return status;
+ }
- CHECK(process != NULL);
+ CHECK(process != NULL);
- dispatch(process, &SchedulerProcess::launchTasks, offerIds, tasks, filters);
+ dispatch(process, &SchedulerProcess::launchTasks, offerIds, tasks, filters);
- return status;
+ return status;
+ }
}
@@ -1719,22 +1719,22 @@ Status MesosSchedulerDriver::acceptOffers(
const vector<Offer::Operation>& operations,
const Filters& filters)
{
- Lock lock(&mutex);
-
- if (status != DRIVER_RUNNING) {
- return status;
- }
+ synchronized (mutex) {
+ if (status != DRIVER_RUNNING) {
+ return status;
+ }
- CHECK(process != NULL);
+ CHECK(process != NULL);
- dispatch(
- process,
- &SchedulerProcess::acceptOffers,
- offerIds,
- operations,
- filters);
+ dispatch(
+ process,
+ &SchedulerProcess::acceptOffers,
+ offerIds,
+ operations,
+ filters);
- return status;
+ return status;
+ }
}
@@ -1751,40 +1751,40 @@ Status MesosSchedulerDriver::declineOffer(
Status MesosSchedulerDriver::reviveOffers()
{
- Lock lock(&mutex);
-
- if (status != DRIVER_RUNNING) {
- return status;
- }
+ synchronized (mutex) {
+ if (status != DRIVER_RUNNING) {
+ return status;
+ }
- CHECK(process != NULL);
+ CHECK(process != NULL);
- dispatch(process, &SchedulerProcess::reviveOffers);
+ dispatch(process, &SchedulerProcess::reviveOffers);
- return status;
+ return status;
+ }
}
Status MesosSchedulerDriver::acknowledgeStatusUpdate(
const TaskStatus& taskStatus)
{
- Lock lock(&mutex);
-
- if (status != DRIVER_RUNNING) {
- return status;
- }
+ synchronized (mutex) {
+ if (status != DRIVER_RUNNING) {
+ return status;
+ }
- // TODO(bmahler): Should this use abort() instead?
- if (implicitAcknowlegements) {
- ABORT("Cannot call acknowledgeStatusUpdate:"
- " Implicit acknowledgements are enabled");
- }
+ // TODO(bmahler): Should this use abort() instead?
+ if (implicitAcknowlegements) {
+ ABORT("Cannot call acknowledgeStatusUpdate:"
+ " Implicit acknowledgements are enabled");
+ }
- CHECK(process != NULL);
+ CHECK(process != NULL);
- dispatch(process, &SchedulerProcess::acknowledgeStatusUpdate, taskStatus);
+ dispatch(process, &SchedulerProcess::acknowledgeStatusUpdate, taskStatus);
- return status;
+ return status;
+ }
}
@@ -1793,50 +1793,50 @@ Status MesosSchedulerDriver::sendFrameworkMessage(
const SlaveID& slaveId,
const string& data)
{
- Lock lock(&mutex);
-
- if (status != DRIVER_RUNNING) {
- return status;
- }
+ synchronized (mutex) {
+ if (status != DRIVER_RUNNING) {
+ return status;
+ }
- CHECK(process != NULL);
+ CHECK(process != NULL);
- dispatch(process, &SchedulerProcess::sendFrameworkMessage,
- executorId, slaveId, data);
+ dispatch(process, &SchedulerProcess::sendFrameworkMessage,
+ executorId, slaveId, data);
- return status;
+ return status;
+ }
}
Status MesosSchedulerDriver::reconcileTasks(
const vector<TaskStatus>& statuses)
{
- Lock lock(&mutex);
-
- if (status != DRIVER_RUNNING) {
- return status;
- }
+ synchronized (mutex) {
+ if (status != DRIVER_RUNNING) {
+ return status;
+ }
- CHECK(process != NULL);
+ CHECK(process != NULL);
- dispatch(process, &SchedulerProcess::reconcileTasks, statuses);
+ dispatch(process, &SchedulerProcess::reconcileTasks, statuses);
- return status;
+ return status;
+ }
}
Status MesosSchedulerDriver::requestResources(
const vector<Request>& requests)
{
- Lock lock(&mutex);
-
- if (status != DRIVER_RUNNING) {
- return status;
- }
+ synchronized (mutex) {
+ if (status != DRIVER_RUNNING) {
+ return status;
+ }
- CHECK(process != NULL);
+ CHECK(process != NULL);
- dispatch(process, &SchedulerProcess::requestResources, requests);
+ dispatch(process, &SchedulerProcess::requestResources, requests);
- return status;
+ return status;
+ }
}
[02/21] mesos git commit: Forward into _Some constructor to elide
copy.
Posted by be...@apache.org.
Forward into _Some constructor to elide copy.
Review: https://reviews.apache.org/r/34517
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/93b60abb
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/93b60abb
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/93b60abb
Branch: refs/heads/master
Commit: 93b60abb4b470a644fd0a8d51708d8fddf11b1a7
Parents: 763364e
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Jun 13 04:52:16 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Jun 14 02:42:59 2015 -0700
----------------------------------------------------------------------
.../libprocess/3rdparty/stout/include/stout/some.hpp | 11 +++++++----
1 file changed, 7 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/93b60abb/3rdparty/libprocess/3rdparty/stout/include/stout/some.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/stout/include/stout/some.hpp b/3rdparty/libprocess/3rdparty/stout/include/stout/some.hpp
index 1a71ac4..7c5515c 100644
--- a/3rdparty/libprocess/3rdparty/stout/include/stout/some.hpp
+++ b/3rdparty/libprocess/3rdparty/stout/include/stout/some.hpp
@@ -14,6 +14,9 @@
#ifndef __STOUT_SOME_HPP__
#define __STOUT_SOME_HPP__
+#include <type_traits>
+#include <utility>
+
// A useful type that can be used to represent an Option or Result.
//
// Examples:
@@ -30,16 +33,16 @@
template <typename T>
struct _Some
{
- _Some(T _t) : t(_t) {}
+ _Some(T _t) : t(std::move(_t)) {}
- const T t;
+ T t;
};
template <typename T>
-_Some<T> Some(T t)
+_Some<typename std::decay<T>::type> Some(T&& t)
{
- return _Some<T>(t);
+ return _Some<typename std::decay<T>::type>(std::forward<T>(t));
}
#endif // __STOUT_SOME_HPP__
[04/21] mesos git commit: Update libprocess gmock to use synchronized.
Posted by be...@apache.org.
Update libprocess gmock to use synchronized.
Review: https://reviews.apache.org/r/35091
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e1a13ad8
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e1a13ad8
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e1a13ad8
Branch: refs/heads/master
Commit: e1a13ad88cae3b896194a92a9766a430ba08286f
Parents: 9cb1283
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Jun 13 06:03:40 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Jun 14 02:43:00 2015 -0700
----------------------------------------------------------------------
3rdparty/libprocess/include/process/gmock.hpp | 94 +++++++++++-----------
1 file changed, 45 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/e1a13ad8/3rdparty/libprocess/include/process/gmock.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/gmock.hpp b/3rdparty/libprocess/include/process/gmock.hpp
index 6adc034..0fd3f8c 100644
--- a/3rdparty/libprocess/include/process/gmock.hpp
+++ b/3rdparty/libprocess/include/process/gmock.hpp
@@ -1,8 +1,6 @@
#ifndef __PROCESS_GMOCK_HPP__
#define __PROCESS_GMOCK_HPP__
-#include <pthread.h>
-
#include <gmock/gmock.h>
#include <process/dispatch.hpp>
@@ -12,6 +10,7 @@
#include <stout/exit.hpp>
#include <stout/nothing.hpp>
+#include <stout/synchronized.hpp>
// NOTE: The gmock library relies on std::tr1::tuple. The gmock
// library provides multiple possible 'tuple' implementations but it
@@ -222,19 +221,7 @@ public:
class TestsFilter : public Filter
{
public:
- TestsFilter()
- {
- // We use a recursive mutex here in the event that satisfying the
- // future created in FutureMessage or FutureDispatch via the
- // FutureArgField or FutureSatisfy actions invokes callbacks (from
- // Future::then or Future::onAny, etc) that themselves invoke
- // FutureDispatch or FutureMessage.
- pthread_mutexattr_t attr;
- pthread_mutexattr_init(&attr);
- pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
- pthread_mutex_init(&mutex, &attr);
- pthread_mutexattr_destroy(&attr);
- }
+ TestsFilter() = default;
virtual bool filter(const MessageEvent& event) { return handle(event); }
virtual bool filter(const DispatchEvent& event) { return handle(event); }
@@ -244,14 +231,19 @@ public:
template <typename T>
bool handle(const T& t)
{
- pthread_mutex_lock(&mutex);
- bool drop = mock.filter(t);
- pthread_mutex_unlock(&mutex);
- return drop;
+ synchronized (mutex) {
+ return mock.filter(t);
+ }
}
MockFilter mock;
- pthread_mutex_t mutex;
+
+ // We use a recursive mutex here in the event that satisfying the
+ // future created in FutureMessage or FutureDispatch via the
+ // FutureArgField or FutureSatisfy actions invokes callbacks (from
+ // Future::then or Future::onAny, etc) that themselves invoke
+ // FutureDispatch or FutureMessage.
+ std::recursive_mutex mutex;
};
@@ -337,14 +329,17 @@ template <typename Name, typename From, typename To>
Future<Message> FutureMessage(Name name, From from, To to, bool drop = false)
{
TestsFilter* filter = FilterTestEventListener::instance()->install();
- pthread_mutex_lock(&filter->mutex);
Future<Message> future;
- EXPECT_CALL(filter->mock, filter(testing::A<const MessageEvent&>()))
- .With(MessageMatcher(name, from, to))
- .WillOnce(testing::DoAll(FutureArgField<0>(&MessageEvent::message, &future),
- testing::Return(drop)))
- .RetiresOnSaturation(); // Don't impose any subsequent expectations.
- pthread_mutex_unlock(&filter->mutex);
+ synchronized (filter->mutex) {
+ EXPECT_CALL(filter->mock, filter(testing::A<const MessageEvent&>()))
+ .With(MessageMatcher(name, from, to))
+ .WillOnce(testing::DoAll(FutureArgField<0>(
+ &MessageEvent::message,
+ &future),
+ testing::Return(drop)))
+ .RetiresOnSaturation(); // Don't impose any subsequent expectations.
+ }
+
return future;
}
@@ -353,14 +348,15 @@ template <typename PID, typename Method>
Future<Nothing> FutureDispatch(PID pid, Method method, bool drop = false)
{
TestsFilter* filter = FilterTestEventListener::instance()->install();
- pthread_mutex_lock(&filter->mutex);
Future<Nothing> future;
- EXPECT_CALL(filter->mock, filter(testing::A<const DispatchEvent&>()))
- .With(DispatchMatcher(pid, method))
- .WillOnce(testing::DoAll(FutureSatisfy(&future),
- testing::Return(drop)))
- .RetiresOnSaturation(); // Don't impose any subsequent expectations.
- pthread_mutex_unlock(&filter->mutex);
+ synchronized (filter->mutex) {
+ EXPECT_CALL(filter->mock, filter(testing::A<const DispatchEvent&>()))
+ .With(DispatchMatcher(pid, method))
+ .WillOnce(testing::DoAll(FutureSatisfy(&future),
+ testing::Return(drop)))
+ .RetiresOnSaturation(); // Don't impose any subsequent expectations.
+ }
+
return future;
}
@@ -369,11 +365,11 @@ template <typename Name, typename From, typename To>
void DropMessages(Name name, From from, To to)
{
TestsFilter* filter = FilterTestEventListener::instance()->install();
- pthread_mutex_lock(&filter->mutex);
- EXPECT_CALL(filter->mock, filter(testing::A<const MessageEvent&>()))
- .With(MessageMatcher(name, from, to))
- .WillRepeatedly(testing::Return(true));
- pthread_mutex_unlock(&filter->mutex);
+ synchronized (filter->mutex) {
+ EXPECT_CALL(filter->mock, filter(testing::A<const MessageEvent&>()))
+ .With(MessageMatcher(name, from, to))
+ .WillRepeatedly(testing::Return(true));
+ }
}
@@ -381,11 +377,11 @@ template <typename Name, typename From, typename To>
void ExpectNoFutureMessages(Name name, From from, To to)
{
TestsFilter* filter = FilterTestEventListener::instance()->install();
- pthread_mutex_lock(&filter->mutex);
- EXPECT_CALL(filter->mock, filter(testing::A<const MessageEvent&>()))
- .With(MessageMatcher(name, from, to))
- .Times(0);
- pthread_mutex_unlock(&filter->mutex);
+ synchronized (filter->mutex) {
+ EXPECT_CALL(filter->mock, filter(testing::A<const MessageEvent&>()))
+ .With(MessageMatcher(name, from, to))
+ .Times(0);
+ }
}
@@ -393,11 +389,11 @@ template <typename PID, typename Method>
void DropDispatches(PID pid, Method method)
{
TestsFilter* filter = FilterTestEventListener::instance()->install();
- pthread_mutex_lock(&filter->mutex);
- EXPECT_CALL(filter->mock, filter(testing::A<const DispatchEvent&>()))
- .With(DispatchMatcher(pid, method))
- .WillRepeatedly(testing::Return(true));
- pthread_mutex_unlock(&filter->mutex);
+ synchronized (filter->mutex) {
+ EXPECT_CALL(filter->mock, filter(testing::A<const DispatchEvent&>()))
+ .With(DispatchMatcher(pid, method))
+ .WillRepeatedly(testing::Return(true));
+ }
}
} // namespace process {
[05/21] mesos git commit: Update libprocess Process to use
synchronized.
Posted by be...@apache.org.
Update libprocess Process to use synchronized.
Review: https://reviews.apache.org/r/35090
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9cb1283b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9cb1283b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9cb1283b
Branch: refs/heads/master
Commit: 9cb1283bcd942574bea07d0cf9b6748ae3869cc6
Parents: 9e7f64a
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Jun 13 05:58:13 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Jun 14 02:43:00 2015 -0700
----------------------------------------------------------------------
3rdparty/libprocess/include/process/process.hpp | 20 ++++-----------
3rdparty/libprocess/src/process.cpp | 26 ++++----------------
2 files changed, 10 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/9cb1283b/3rdparty/libprocess/include/process/process.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/process.hpp b/3rdparty/libprocess/include/process/process.hpp
index e70dd38..6a0b21d 100644
--- a/3rdparty/libprocess/include/process/process.hpp
+++ b/3rdparty/libprocess/include/process/process.hpp
@@ -2,7 +2,6 @@
#define __PROCESS_PROCESS_HPP__
#include <stdint.h>
-#include <pthread.h>
#include <map>
#include <queue>
@@ -20,6 +19,7 @@
#include <stout/duration.hpp>
#include <stout/lambda.hpp>
#include <stout/option.hpp>
+#include <stout/synchronized.hpp>
#include <stout/thread.hpp>
namespace process {
@@ -180,24 +180,14 @@ protected:
assets[name] = asset;
}
- void lock()
- {
- pthread_mutex_lock(&m);
- }
-
- void unlock()
- {
- pthread_mutex_unlock(&m);
- }
-
template<typename T>
size_t eventCount()
{
size_t count = 0U;
- lock();
- count = std::count_if(events.begin(), events.end(), isEventType<T>);
- unlock();
+ synchronized (mutex) {
+ count = std::count_if(events.begin(), events.end(), isEventType<T>);
+ }
return count;
}
@@ -226,7 +216,7 @@ private:
// Mutex protecting internals.
// TODO(benh): Consider replacing with a spinlock, on multi-core systems.
- pthread_mutex_t m;
+ std::recursive_mutex mutex;
// Enqueue the specified message, request, or function call.
void enqueue(Event* event, bool inject = false);
http://git-wip-us.apache.org/repos/asf/mesos/blob/9cb1283b/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index aadd7bb..c2baa6c 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -2147,8 +2147,7 @@ void ProcessManager::resume(ProcessBase* process)
while (!terminate && !blocked) {
Event* event = NULL;
- process->lock();
- {
+ synchronized (process->mutex) {
if (process->events.size() > 0) {
event = process->events.front();
process->events.pop_front();
@@ -2158,7 +2157,6 @@ void ProcessManager::resume(ProcessBase* process)
blocked = true;
}
}
- process->unlock();
if (!blocked) {
CHECK(event != NULL);
@@ -2251,13 +2249,11 @@ void ProcessManager::cleanup(ProcessBase* process)
// another process that gets spawned with the same PID.
deque<Event*> events;
- process->lock();
- {
+ synchronized (process->mutex) {
process->state = ProcessBase::TERMINATING;
events = process->events;
process->events.clear();
}
- process->unlock();
// Delete pending events.
while (!events.empty()) {
@@ -2279,8 +2275,7 @@ void ProcessManager::cleanup(ProcessBase* process)
__sync_synchronize();
}
- process->lock();
- {
+ synchronized (process->mutex) {
CHECK(process->events.empty());
processes.erase(process->pid.id);
@@ -2296,7 +2291,6 @@ void ProcessManager::cleanup(ProcessBase* process)
CHECK(process->refs == 0);
process->state = ProcessBase::TERMINATED;
}
- process->unlock();
// Note that we don't remove the process from the clock during
// cleanup, but rather the clock is reset for a process when it is
@@ -2619,13 +2613,11 @@ Future<Response> ProcessManager::__processes__(const Request&)
JSON::Array* events;
} visitor(&events);
- process->lock();
- {
+ synchronized (process->mutex) {
foreach (Event* event, process->events) {
event->visit(&visitor);
}
}
- process->unlock();
object.values["events"] = events;
array.values.push_back(object);
@@ -2642,12 +2634,6 @@ ProcessBase::ProcessBase(const string& id)
state = ProcessBase::BOTTOM;
- pthread_mutexattr_t attr;
- pthread_mutexattr_init(&attr);
- pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
- pthread_mutex_init(&m, &attr);
- pthread_mutexattr_destroy(&attr);
-
refs = 0;
pid.id = id != "" ? id : ID::generate();
@@ -2669,8 +2655,7 @@ void ProcessBase::enqueue(Event* event, bool inject)
{
CHECK(event != NULL);
- lock();
- {
+ synchronized (mutex) {
if (state != TERMINATING && state != TERMINATED) {
if (!inject) {
events.push_back(event);
@@ -2690,7 +2675,6 @@ void ProcessBase::enqueue(Event* event, bool inject)
delete event;
}
}
- unlock();
}
[18/21] mesos git commit: mesos: Use CHECK_SOME, CHECK_NONE,
CHECK_ERROR.
Posted by be...@apache.org.
mesos: Use CHECK_SOME, CHECK_NONE, CHECK_ERROR.
Used `grep -r "CHECK([^\!].*\.isNone())" .` to find the instances that
look like `CHECK(x.isNone());`
Review: https://reviews.apache.org/r/35426
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/26527296
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/26527296
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/26527296
Branch: refs/heads/master
Commit: 26527296f1e4ffc53a4545a63e1e6e0cee06e9f9
Parents: 9823648
Author: Michael Park <mc...@gmail.com>
Authored: Sat Jun 13 07:29:31 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Jun 14 02:43:01 2015 -0700
----------------------------------------------------------------------
src/launcher/fetcher.cpp | 2 +-
src/linux/cgroups.cpp | 2 +-
src/master/registrar.cpp | 2 +-
src/slave/containerizer/isolators/cgroups/cpushare.cpp | 2 +-
src/slave/containerizer/isolators/cgroups/mem.cpp | 2 +-
src/slave/status_update_manager.cpp | 8 ++++----
src/state/leveldb.cpp | 4 ++--
src/state/zookeeper.cpp | 6 +++---
src/zookeeper/contender.cpp | 2 +-
src/zookeeper/group.cpp | 10 +++++-----
10 files changed, 20 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/26527296/src/launcher/fetcher.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/fetcher.cpp b/src/launcher/fetcher.cpp
index a3d27dc..8aee490 100644
--- a/src/launcher/fetcher.cpp
+++ b/src/launcher/fetcher.cpp
@@ -385,7 +385,7 @@ int main(int argc, char* argv[])
logging::initialize(argv[0], flags, true); // Catch signals.
const Option<std::string> jsonFetcherInfo = os::getenv("MESOS_FETCHER_INFO");
- CHECK(jsonFetcherInfo.isSome())
+ CHECK_SOME(jsonFetcherInfo)
<< "Missing MESOS_FETCHER_INFO environment variable";
LOG(INFO) << "Fetcher Info: " << jsonFetcherInfo.get();
http://git-wip-us.apache.org/repos/asf/mesos/blob/26527296/src/linux/cgroups.cpp
----------------------------------------------------------------------
diff --git a/src/linux/cgroups.cpp b/src/linux/cgroups.cpp
index a612fab..6a87ac4 100644
--- a/src/linux/cgroups.cpp
+++ b/src/linux/cgroups.cpp
@@ -2320,7 +2320,7 @@ private:
void _listen(const process::Future<uint64_t>& future)
{
- CHECK(error.isNone());
+ CHECK_NONE(error);
if (future.isReady()) {
value_ += future.get();
http://git-wip-us.apache.org/repos/asf/mesos/blob/26527296/src/master/registrar.cpp
----------------------------------------------------------------------
diff --git a/src/master/registrar.cpp b/src/master/registrar.cpp
index 3fde8fa..b871229 100644
--- a/src/master/registrar.cpp
+++ b/src/master/registrar.cpp
@@ -419,7 +419,7 @@ void RegistrarProcess::update()
}
CHECK(!updating);
- CHECK(error.isNone());
+ CHECK_NONE(error);
CHECK_SOME(variable);
// Time how long it takes to apply the operations.
http://git-wip-us.apache.org/repos/asf/mesos/blob/26527296/src/slave/containerizer/isolators/cgroups/cpushare.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/isolators/cgroups/cpushare.cpp b/src/slave/containerizer/isolators/cgroups/cpushare.cpp
index 5bd3525..21e4284 100644
--- a/src/slave/containerizer/isolators/cgroups/cpushare.cpp
+++ b/src/slave/containerizer/isolators/cgroups/cpushare.cpp
@@ -315,7 +315,7 @@ Future<Nothing> CgroupsCpushareIsolatorProcess::isolate(
Info* info = CHECK_NOTNULL(infos[containerId]);
- CHECK(info->pid.isNone());
+ CHECK_NONE(info->pid);
info->pid = pid;
foreach (const string& subsystem, subsystems) {
http://git-wip-us.apache.org/repos/asf/mesos/blob/26527296/src/slave/containerizer/isolators/cgroups/mem.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/isolators/cgroups/mem.cpp b/src/slave/containerizer/isolators/cgroups/mem.cpp
index 7fb6c8a..9d65bf5 100644
--- a/src/slave/containerizer/isolators/cgroups/mem.cpp
+++ b/src/slave/containerizer/isolators/cgroups/mem.cpp
@@ -299,7 +299,7 @@ Future<Nothing> CgroupsMemIsolatorProcess::isolate(
Info* info = CHECK_NOTNULL(infos[containerId]);
- CHECK(info->pid.isNone());
+ CHECK_NONE(info->pid);
info->pid = pid;
Try<Nothing> assign = cgroups::assign(hierarchy, info->cgroup, pid);
http://git-wip-us.apache.org/repos/asf/mesos/blob/26527296/src/slave/status_update_manager.cpp
----------------------------------------------------------------------
diff --git a/src/slave/status_update_manager.cpp b/src/slave/status_update_manager.cpp
index 1d7c4d0..35b943b 100644
--- a/src/slave/status_update_manager.cpp
+++ b/src/slave/status_update_manager.cpp
@@ -352,7 +352,7 @@ Future<Nothing> StatusUpdateManagerProcess::_update(
// Forward the status update to the master if this is the first in the stream.
// Subsequent status updates will get sent in 'acknowledgement()'.
if (!paused && stream->pending.size() == 1) {
- CHECK(stream->timeout.isNone());
+ CHECK_NONE(stream->timeout);
const Result<StatusUpdate>& next = stream->next();
if (next.isError()) {
return Failure(next.error());
@@ -470,7 +470,7 @@ void StatusUpdateManagerProcess::timeout(const Duration& duration)
foreachvalue (StatusUpdateStream* stream, streams[frameworkId]) {
CHECK_NOTNULL(stream);
if (!stream->pending.empty()) {
- CHECK(stream->timeout.isSome());
+ CHECK_SOME(stream->timeout);
if (stream->timeout.get().expired()) {
const StatusUpdate& update = stream->pending.front();
LOG(WARNING) << "Resending status update " << update;
@@ -814,7 +814,7 @@ Try<Nothing> StatusUpdateStream::handle(
const StatusUpdate& update,
const StatusUpdateRecord::Type& type)
{
- CHECK(error.isNone());
+ CHECK_NONE(error);
// Checkpoint the update if necessary.
if (checkpoint) {
@@ -850,7 +850,7 @@ void StatusUpdateStream::_handle(
const StatusUpdate& update,
const StatusUpdateRecord::Type& type)
{
- CHECK(error.isNone());
+ CHECK_NONE(error);
if (type == StatusUpdateRecord::UPDATE) {
// Record this update.
http://git-wip-us.apache.org/repos/asf/mesos/blob/26527296/src/state/leveldb.cpp
----------------------------------------------------------------------
diff --git a/src/state/leveldb.cpp b/src/state/leveldb.cpp
index 4303df3..14a1807 100644
--- a/src/state/leveldb.cpp
+++ b/src/state/leveldb.cpp
@@ -206,7 +206,7 @@ Future<bool> LevelDBStorageProcess::expunge(const Entry& entry)
Try<Option<Entry> > LevelDBStorageProcess::read(const string& name)
{
- CHECK(error.isNone());
+ CHECK_NONE(error);
leveldb::ReadOptions options;
@@ -234,7 +234,7 @@ Try<Option<Entry> > LevelDBStorageProcess::read(const string& name)
Try<bool> LevelDBStorageProcess::write(const Entry& entry)
{
- CHECK(error.isNone());
+ CHECK_NONE(error);
leveldb::WriteOptions options;
options.sync = true;
http://git-wip-us.apache.org/repos/asf/mesos/blob/26527296/src/state/zookeeper.cpp
----------------------------------------------------------------------
diff --git a/src/state/zookeeper.cpp b/src/state/zookeeper.cpp
index d355bd7..9151d55 100644
--- a/src/state/zookeeper.cpp
+++ b/src/state/zookeeper.cpp
@@ -426,7 +426,7 @@ Result<std::set<string> > ZooKeeperStorageProcess::doNames()
Result<Option<Entry> > ZooKeeperStorageProcess::doGet(const string& name)
{
- CHECK(error.isNone()) << ": " << error.get();
+ CHECK_NONE(error) << ": " << error.get();
CHECK(state == CONNECTED);
string result;
@@ -460,7 +460,7 @@ Result<Option<Entry> > ZooKeeperStorageProcess::doGet(const string& name)
Result<bool> ZooKeeperStorageProcess::doSet(const Entry& entry,
const UUID& uuid)
{
- CHECK(error.isNone()) << ": " << error.get();
+ CHECK_NONE(error) << ": " << error.get();
CHECK(state == CONNECTED);
// Serialize to make sure we're under the 1 MB limit.
@@ -558,7 +558,7 @@ Result<bool> ZooKeeperStorageProcess::doSet(const Entry& entry,
Result<bool> ZooKeeperStorageProcess::doExpunge(const Entry& entry)
{
- CHECK(error.isNone()) << ": " << error.get();
+ CHECK_NONE(error) << ": " << error.get();
CHECK(state == CONNECTED);
string result;
http://git-wip-us.apache.org/repos/asf/mesos/blob/26527296/src/zookeeper/contender.cpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/contender.cpp b/src/zookeeper/contender.cpp
index 55cb7a1..3255ef0 100644
--- a/src/zookeeper/contender.cpp
+++ b/src/zookeeper/contender.cpp
@@ -226,7 +226,7 @@ void LeaderContenderProcess::joined()
CHECK(!candidacy.isDiscarded());
// Cannot be watching because the candidacy is not obtained yet.
- CHECK(watching.isNone());
+ CHECK_NONE(watching);
CHECK_SOME(contending);
http://git-wip-us.apache.org/repos/asf/mesos/blob/26527296/src/zookeeper/group.cpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/group.cpp b/src/zookeeper/group.cpp
index 173caa8..33c56da 100644
--- a/src/zookeeper/group.cpp
+++ b/src/zookeeper/group.cpp
@@ -267,7 +267,7 @@ Future<set<Group::Membership> > GroupProcess::watch(
// Non-retryable error.
return Failure(cached.error());
} else if (!cached.get()) {
- CHECK(memberships.isNone());
+ CHECK_NONE(memberships);
// Try again later.
if (!retrying) {
@@ -430,7 +430,7 @@ void GroupProcess::reconnecting(int64_t sessionId)
// we create a local timer and "expire" our session prematurely if
// we haven't reconnected within the session expiration time out.
// The timer can be reset if the connection is restored.
- CHECK(timer.isNone());
+ CHECK_NONE(timer);
// Use the negotiated session timeout for the reconnect timer.
timer = delay(zk->getSessionTimeout(),
@@ -533,7 +533,7 @@ void GroupProcess::updated(int64_t sessionId, const string& path)
if (cached.isError()) {
abort(cached.error()); // Cancel everything pending.
} else if (!cached.get()) {
- CHECK(memberships.isNone());
+ CHECK_NONE(memberships);
// Try again later.
if (!retrying) {
@@ -868,7 +868,7 @@ Try<bool> GroupProcess::sync()
if (memberships.isNone()) {
Try<bool> cached = cache();
if (cached.isError() || !cached.get()) {
- CHECK(memberships.isNone());
+ CHECK_NONE(memberships);
return cached;
} else {
update(); // Update any pending watches.
@@ -889,7 +889,7 @@ void GroupProcess::retry(const Duration& duration)
// We cancel the retries when the group aborts and when its ZK
// session expires so 'retrying' should be false in the condition
// check above.
- CHECK(error.isNone());
+ CHECK_NONE(error);
// In order to be retrying, we should be at least CONNECTED.
CHECK(state == CONNECTED || state == AUTHENTICATED || state == READY)
[19/21] mesos git commit: Update Mesos executor to use synchronized.
Posted by be...@apache.org.
Update Mesos executor to use synchronized.
Review: https://reviews.apache.org/r/35097
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8939609d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8939609d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8939609d
Branch: refs/heads/master
Commit: 8939609d403aa1043d637cc03647c4ee40478b20
Parents: b2d8047
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Jun 13 07:12:31 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Jun 14 02:43:01 2015 -0700
----------------------------------------------------------------------
src/exec/exec.cpp | 287 +++++++++++++++++++++++++------------------------
1 file changed, 145 insertions(+), 142 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/8939609d/src/exec/exec.cpp
----------------------------------------------------------------------
diff --git a/src/exec/exec.cpp b/src/exec/exec.cpp
index 0dfd5a6..930dda9 100644
--- a/src/exec/exec.cpp
+++ b/src/exec/exec.cpp
@@ -43,9 +43,9 @@
#include <stout/os.hpp>
#include <stout/stopwatch.hpp>
#include <stout/stringify.hpp>
+#include <stout/synchronized.hpp>
#include <stout/uuid.hpp>
-#include "common/lock.hpp"
#include "common/protobuf_utils.hpp"
#include "logging/flags.hpp"
@@ -404,8 +404,9 @@ protected:
{
terminate(self());
- Lock lock(mutex);
- pthread_cond_signal(cond);
+ synchronized (mutex) {
+ pthread_cond_signal(cond);
+ }
}
void abort()
@@ -413,8 +414,9 @@ protected:
LOG(INFO) << "Deactivating the executor libprocess";
CHECK(aborted);
- Lock lock(mutex);
- pthread_cond_signal(cond);
+ synchronized (mutex) {
+ pthread_cond_signal(cond);
+ }
}
void _recoveryTimeout(UUID _connection)
@@ -611,173 +613,174 @@ MesosExecutorDriver::~MesosExecutorDriver()
Status MesosExecutorDriver::start()
{
- Lock lock(&mutex);
+ synchronized (mutex) {
+ if (status != DRIVER_NOT_STARTED) {
+ return status;
+ }
- if (status != DRIVER_NOT_STARTED) {
- return status;
- }
+ // Set stream buffering mode to flush on newlines so that we
+ // capture logs from user processes even when output is redirected
+ // to a file.
+ setvbuf(stdout, 0, _IOLBF, 0);
+ setvbuf(stderr, 0, _IOLBF, 0);
- // Set stream buffering mode to flush on newlines so that we capture logs
- // from user processes even when output is redirected to a file.
- setvbuf(stdout, 0, _IOLBF, 0);
- setvbuf(stderr, 0, _IOLBF, 0);
+ bool local;
- bool local;
+ UPID slave;
+ SlaveID slaveId;
+ FrameworkID frameworkId;
+ ExecutorID executorId;
+ string workDirectory;
+ bool checkpoint;
- UPID slave;
- SlaveID slaveId;
- FrameworkID frameworkId;
- ExecutorID executorId;
- string workDirectory;
- bool checkpoint;
-
- Option<string> value;
- std::istringstream iss;
+ Option<string> value;
+ std::istringstream iss;
- // Check if this is local (for example, for testing).
- local = os::getenv("MESOS_LOCAL").isSome();
+ // Check if this is local (for example, for testing).
+ local = os::getenv("MESOS_LOCAL").isSome();
- // Get slave PID from environment.
- value = os::getenv("MESOS_SLAVE_PID");
- if (value.isNone()) {
- EXIT(1) << "Expecting 'MESOS_SLAVE_PID' to be set in the environment.";
- }
+ // Get slave PID from environment.
+ value = os::getenv("MESOS_SLAVE_PID");
+ if (value.isNone()) {
+ EXIT(1) << "Expecting 'MESOS_SLAVE_PID' to be set in the environment.";
+ }
- slave = UPID(value.get());
- CHECK(slave) << "Cannot parse MESOS_SLAVE_PID '" << value.get() << "'";
+ slave = UPID(value.get());
+ CHECK(slave) << "Cannot parse MESOS_SLAVE_PID '" << value.get() << "'";
- // Get slave ID from environment.
- value = os::getenv("MESOS_SLAVE_ID");
- if (value.isNone()) {
- EXIT(1) << "Expecting 'MESOS_SLAVE_ID' to be set in the environment.";
- }
- slaveId.set_value(value.get());
+ // Get slave ID from environment.
+ value = os::getenv("MESOS_SLAVE_ID");
+ if (value.isNone()) {
+ EXIT(1) << "Expecting 'MESOS_SLAVE_ID' to be set in the environment.";
+ }
+ slaveId.set_value(value.get());
- // Get framework ID from environment.
- value = os::getenv("MESOS_FRAMEWORK_ID");
- if (value.isNone()) {
- EXIT(1) << "Expecting 'MESOS_FRAMEWORK_ID' to be set in the environment.";
- }
- frameworkId.set_value(value.get());
+ // Get framework ID from environment.
+ value = os::getenv("MESOS_FRAMEWORK_ID");
+ if (value.isNone()) {
+ EXIT(1) << "Expecting 'MESOS_FRAMEWORK_ID' to be set in the environment.";
+ }
+ frameworkId.set_value(value.get());
- // Get executor ID from environment.
- value = os::getenv("MESOS_EXECUTOR_ID");
- if (value.isNone()) {
- EXIT(1) << "Expecting 'MESOS_EXECUTOR_ID' to be set in the environment.";
- }
- executorId.set_value(value.get());
+ // Get executor ID from environment.
+ value = os::getenv("MESOS_EXECUTOR_ID");
+ if (value.isNone()) {
+ EXIT(1) << "Expecting 'MESOS_EXECUTOR_ID' to be set in the environment.";
+ }
+ executorId.set_value(value.get());
- // Get working directory from environment.
- value = os::getenv("MESOS_DIRECTORY");
- if (value.isNone()) {
- EXIT(1) << "Expecting 'MESOS_DIRECTORY' to be set in the environment.";
- }
- workDirectory = value.get();
+ // Get working directory from environment.
+ value = os::getenv("MESOS_DIRECTORY");
+ if (value.isNone()) {
+ EXIT(1) << "Expecting 'MESOS_DIRECTORY' to be set in the environment.";
+ }
+ workDirectory = value.get();
- // Get checkpointing status from environment.
- value = os::getenv("MESOS_CHECKPOINT");
- checkpoint = value.isSome() && value.get() == "1";
+ // Get checkpointing status from environment.
+ value = os::getenv("MESOS_CHECKPOINT");
+ checkpoint = value.isSome() && value.get() == "1";
- Duration recoveryTimeout = slave::RECOVERY_TIMEOUT;
+ Duration recoveryTimeout = slave::RECOVERY_TIMEOUT;
- // Get the recovery timeout if checkpointing is enabled.
- if (checkpoint) {
- value = os::getenv("MESOS_RECOVERY_TIMEOUT");
+ // Get the recovery timeout if checkpointing is enabled.
+ if (checkpoint) {
+ value = os::getenv("MESOS_RECOVERY_TIMEOUT");
- if (value.isSome()) {
- Try<Duration> _recoveryTimeout = Duration::parse(value.get());
+ if (value.isSome()) {
+ Try<Duration> _recoveryTimeout = Duration::parse(value.get());
- CHECK_SOME(_recoveryTimeout)
- << "Cannot parse MESOS_RECOVERY_TIMEOUT '" << value.get() << "': "
- << _recoveryTimeout.error();
+ CHECK_SOME(_recoveryTimeout)
+ << "Cannot parse MESOS_RECOVERY_TIMEOUT '" << value.get() << "': "
+ << _recoveryTimeout.error();
- recoveryTimeout = _recoveryTimeout.get();
+ recoveryTimeout = _recoveryTimeout.get();
+ }
}
- }
- CHECK(process == NULL);
-
- process = new ExecutorProcess(
- slave,
- this,
- executor,
- slaveId,
- frameworkId,
- executorId,
- local,
- workDirectory,
- checkpoint,
- recoveryTimeout,
- &mutex,
- &cond);
-
- spawn(process);
-
- return status = DRIVER_RUNNING;
+ CHECK(process == NULL);
+
+ process = new ExecutorProcess(
+ slave,
+ this,
+ executor,
+ slaveId,
+ frameworkId,
+ executorId,
+ local,
+ workDirectory,
+ checkpoint,
+ recoveryTimeout,
+ &mutex,
+ &cond);
+
+ spawn(process);
+
+ return status = DRIVER_RUNNING;
+ }
}
Status MesosExecutorDriver::stop()
{
- Lock lock(&mutex);
-
- if (status != DRIVER_RUNNING && status != DRIVER_ABORTED) {
- return status;
- }
+ synchronized (mutex) {
+ if (status != DRIVER_RUNNING && status != DRIVER_ABORTED) {
+ return status;
+ }
- CHECK(process != NULL);
+ CHECK(process != NULL);
- dispatch(process, &ExecutorProcess::stop);
+ dispatch(process, &ExecutorProcess::stop);
- bool aborted = status == DRIVER_ABORTED;
+ bool aborted = status == DRIVER_ABORTED;
- status = DRIVER_STOPPED;
+ status = DRIVER_STOPPED;
- return aborted ? DRIVER_ABORTED : status;
+ return aborted ? DRIVER_ABORTED : status;
+ }
}
Status MesosExecutorDriver::abort()
{
- Lock lock(&mutex);
-
- if (status != DRIVER_RUNNING) {
- return status;
- }
+ synchronized (mutex) {
+ if (status != DRIVER_RUNNING) {
+ return status;
+ }
- CHECK(process != NULL);
+ CHECK(process != NULL);
- // We set the volatile aborted to true here to prevent any further
- // messages from being processed in the ExecutorProcess. However,
- // if abort() is called from another thread as the ExecutorProcess,
- // there may be at most one additional message processed.
- // TODO(bmahler): Use an atomic boolean.
- process->aborted = true;
+ // We set the volatile aborted to true here to prevent any further
+ // messages from being processed in the ExecutorProcess. However,
+ // if abort() is called from another thread as the ExecutorProcess,
+ // there may be at most one additional message processed.
+ // TODO(bmahler): Use an atomic boolean.
+ process->aborted = true;
- // Dispatching here ensures that we still process the outstanding
- // requests *from* the executor, since those do proceed when
- // aborted is true.
- dispatch(process, &ExecutorProcess::abort);
+ // Dispatching here ensures that we still process the outstanding
+ // requests *from* the executor, since those do proceed when
+ // aborted is true.
+ dispatch(process, &ExecutorProcess::abort);
- return status = DRIVER_ABORTED;
+ return status = DRIVER_ABORTED;
+ }
}
Status MesosExecutorDriver::join()
{
- Lock lock(&mutex);
-
- if (status != DRIVER_RUNNING) {
- return status;
- }
+ synchronized (mutex) {
+ if (status != DRIVER_RUNNING) {
+ return status;
+ }
- while (status == DRIVER_RUNNING) {
- pthread_cond_wait(&cond, &mutex);
- }
+ while (status == DRIVER_RUNNING) {
+ pthread_cond_wait(&cond, &mutex);
+ }
- CHECK(status == DRIVER_ABORTED || status == DRIVER_STOPPED);
+ CHECK(status == DRIVER_ABORTED || status == DRIVER_STOPPED);
- return status;
+ return status;
+ }
}
@@ -790,31 +793,31 @@ Status MesosExecutorDriver::run()
Status MesosExecutorDriver::sendStatusUpdate(const TaskStatus& taskStatus)
{
- Lock lock(&mutex);
-
- if (status != DRIVER_RUNNING) {
- return status;
- }
+ synchronized (mutex) {
+ if (status != DRIVER_RUNNING) {
+ return status;
+ }
- CHECK(process != NULL);
+ CHECK(process != NULL);
- dispatch(process, &ExecutorProcess::sendStatusUpdate, taskStatus);
+ dispatch(process, &ExecutorProcess::sendStatusUpdate, taskStatus);
- return status;
+ return status;
+ }
}
Status MesosExecutorDriver::sendFrameworkMessage(const string& data)
{
- Lock lock(&mutex);
-
- if (status != DRIVER_RUNNING) {
- return status;
- }
+ synchronized (mutex) {
+ if (status != DRIVER_RUNNING) {
+ return status;
+ }
- CHECK(process != NULL);
+ CHECK(process != NULL);
- dispatch(process, &ExecutorProcess::sendFrameworkMessage, data);
+ dispatch(process, &ExecutorProcess::sendFrameworkMessage, data);
- return status;
+ return status;
+ }
}
[10/21] mesos git commit: Add depedendent include to synchronized.hpp.
Posted by be...@apache.org.
Add depedendent include to synchronized.hpp.
Review: https://reviews.apache.org/r/35088
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/eb8f88db
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/eb8f88db
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/eb8f88db
Branch: refs/heads/master
Commit: eb8f88db5c4e5f87159016267df6b9f771297eb1
Parents: 19b1960
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Jun 13 05:54:22 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Jun 14 02:43:00 2015 -0700
----------------------------------------------------------------------
3rdparty/libprocess/3rdparty/stout/include/stout/synchronized.hpp | 2 ++
1 file changed, 2 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/eb8f88db/3rdparty/libprocess/3rdparty/stout/include/stout/synchronized.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/stout/include/stout/synchronized.hpp b/3rdparty/libprocess/3rdparty/stout/include/stout/synchronized.hpp
index 9b11cbc..e40ec55 100644
--- a/3rdparty/libprocess/3rdparty/stout/include/stout/synchronized.hpp
+++ b/3rdparty/libprocess/3rdparty/stout/include/stout/synchronized.hpp
@@ -19,6 +19,8 @@
#include <mutex>
#include <type_traits>
+#include <glog/logging.h>
+
#include <stout/preprocessor.hpp>
// An RAII class for the 'synchronized(m)' macro.
[08/21] mesos git commit: Update libprocess Gate to use synchronized.
Posted by be...@apache.org.
Update libprocess Gate to use synchronized.
Review: https://reviews.apache.org/r/35094
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/eb33a57f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/eb33a57f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/eb33a57f
Branch: refs/heads/master
Commit: eb33a57ff32297d885d444336da86762dc98d793
Parents: 149f42f
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Jun 13 06:51:59 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Jun 14 02:43:00 2015 -0700
----------------------------------------------------------------------
3rdparty/libprocess/src/gate.hpp | 38 ++++++++++++++---------------------
1 file changed, 15 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/eb33a57f/3rdparty/libprocess/src/gate.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/gate.hpp b/3rdparty/libprocess/src/gate.hpp
index 69c906b..e5c9379 100644
--- a/3rdparty/libprocess/src/gate.hpp
+++ b/3rdparty/libprocess/src/gate.hpp
@@ -3,6 +3,8 @@
// TODO(benh): Build implementation directly on-top-of futex's for Linux.
+#include <stout/synchronized.hpp>
+
class Gate
{
public:
@@ -31,21 +33,21 @@ public:
// all (if 'all' is true) of the threads waiting on it.
void open(bool all = true)
{
- pthread_mutex_lock(&mutex);
- {
+ synchronized (mutex) {
state++;
- if (all) pthread_cond_broadcast(&cond);
- else pthread_cond_signal(&cond);
+ if (all) {
+ pthread_cond_broadcast(&cond);
+ } else {
+ pthread_cond_signal(&cond);
+ }
}
- pthread_mutex_unlock(&mutex);
}
// Blocks the current thread until the gate's state changes from
// the current state.
void wait()
{
- pthread_mutex_lock(&mutex);
- {
+ synchronized (mutex) {
waiters++;
state_t old = state;
while (old == state) {
@@ -53,7 +55,6 @@ public:
}
waiters--;
}
- pthread_mutex_unlock(&mutex);
}
// Gets the current state of the gate and notifies the gate about
@@ -61,14 +62,10 @@ public:
// Call 'leave()' if no longer interested in the state change.
state_t approach()
{
- state_t old;
- pthread_mutex_lock(&mutex);
- {
+ synchronized (mutex) {
waiters++;
- old = state;
+ return state;
}
- pthread_mutex_unlock(&mutex);
- return old;
}
// Blocks the current thread until the gate's state changes from
@@ -76,25 +73,22 @@ public:
// calling 'approach()'.
void arrive(state_t old)
{
- pthread_mutex_lock(&mutex);
- {
+ synchronized (mutex) {
while (old == state) {
pthread_cond_wait(&cond, &mutex);
}
+
waiters--;
}
- pthread_mutex_unlock(&mutex);
}
// Notifies the gate that a waiter (the current thread) is no
// longer waiting for the gate's state change.
void leave()
{
- pthread_mutex_lock(&mutex);
- {
+ synchronized (mutex) {
waiters--;
}
- pthread_mutex_unlock(&mutex);
}
// Returns true if there is no one waiting on the gate's state
@@ -102,11 +96,9 @@ public:
bool empty()
{
bool occupied = true;
- pthread_mutex_lock(&mutex);
- {
+ synchronized (mutex) {
occupied = waiters > 0 ? true : false;
}
- pthread_mutex_unlock(&mutex);
return !occupied;
}
};
[11/21] mesos git commit: Improvements to Synchronized.
Posted by be...@apache.org.
Improvements to Synchronized.
(1) Simplify introducing new synchronization primitives.
Before:
```cpp
template <>
class Synchronized<bufferevent>
{
public:
Synchronized(bufferevent* _bev) : bev(CHECK_NOTNULL(_bev))
{
bufferevent_lock(bev);
}
Synchronized(bufferevent** _bev) : Synchronized(*CHECK_NOTNULL(_bev)) {}
~Synchronized()
{
bufferevent_unlock(bev);
}
operator bool() const { return true; }
private:
bufferevent* bev;
};
```
After:
```cpp
Synchronized<bufferevent> synchronize(bufferevent* bev) {
return {
bev,
[](bufferevent* bev) { bufferevent_lock(bev); }
[](bufferevent* bev) { bufferevent_unlock(bev); }
};
}
```
(2) Enable `return` within `synchronized` and avoid the `control reaches end of non-void function` warning.
```cpp
int foo()
{
int x = 42;
std::mutex m;
synchronized (m) {
return x;
}
}
```
Review: https://reviews.apache.org/r/35395
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/19b1960f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/19b1960f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/19b1960f
Branch: refs/heads/master
Commit: 19b1960ffd01a61e4e0d2b7b55f5aa60a9cf8738
Parents: c683b00
Author: Michael Park <mc...@gmail.com>
Authored: Sat Jun 13 05:51:50 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Jun 14 02:43:00 2015 -0700
----------------------------------------------------------------------
.../stout/include/stout/synchronized.hpp | 156 ++++++++++++++-----
1 file changed, 120 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/19b1960f/3rdparty/libprocess/3rdparty/stout/include/stout/synchronized.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/stout/include/stout/synchronized.hpp b/3rdparty/libprocess/3rdparty/stout/include/stout/synchronized.hpp
index 60eaf26..9b11cbc 100644
--- a/3rdparty/libprocess/3rdparty/stout/include/stout/synchronized.hpp
+++ b/3rdparty/libprocess/3rdparty/stout/include/stout/synchronized.hpp
@@ -19,57 +19,141 @@
#include <mutex>
#include <type_traits>
-// A helper class for the synchronized(m) macro. It is an RAII 'guard'
-// for a synchronization primitive 'T'. The general template handles
-// cases such as 'std::mutex' and 'std::recursive_mutex'.
+#include <stout/preprocessor.hpp>
+
+// An RAII class for the 'synchronized(m)' macro.
template <typename T>
class Synchronized
{
public:
- Synchronized(T* _lock) : lock(CHECK_NOTNULL(_lock)) { lock->lock(); }
- Synchronized(T** _lock) : Synchronized(*CHECK_NOTNULL(_lock)) {}
+ explicit Synchronized(T* t, void (*acquire)(T*), void (*release)(T*))
+ : t_(CHECK_NOTNULL(t)), release_(release)
+ {
+ acquire(t_);
+ }
+
+ ~Synchronized() { release_(t_); }
- ~Synchronized() { lock->unlock(); }
+ // NOTE: 'false' being returned here has no significance.
+ // Refer to the NOTE for 'synchronized' at the bottom for why.
+ explicit operator bool() const { return false; }
- operator bool() const { return true; }
private:
- T* lock;
+ T* t_;
+
+ void (*release_)(T*);
};
-// A specialization of the Synchronized class for 'std::atomic_flag'.
-// This is necessary as the locking functions are different.
-template <>
-class Synchronized<std::atomic_flag>
+// The generic version handles mutexes which have 'lock' and 'unlock'
+// member functions such as 'std::mutex' and 'std::recursive_mutex'.
+template <typename T>
+Synchronized<T> synchronize(T* t)
{
-public:
- Synchronized(std::atomic_flag* _flag) : flag(CHECK_NOTNULL(_flag))
- {
- while (flag->test_and_set(std::memory_order_acquire)) {}
- }
- Synchronized(std::atomic_flag** _flag)
- : Synchronized(*CHECK_NOTNULL(_flag)) {}
+ return Synchronized<T>(
+ t,
+ [](T* t) { t->lock(); },
+ [](T* t) { t->unlock(); }
+ );
+}
- ~Synchronized()
- {
- flag->clear(std::memory_order_release);
- }
- operator bool() const { return true; }
-private:
- std::atomic_flag* flag;
-};
+// An overload of the 'synchronize' function for 'std::atomic_flag'.
+inline Synchronized<std::atomic_flag> synchronize(std::atomic_flag* lock)
+{
+ return Synchronized<std::atomic_flag>(
+ lock,
+ [](std::atomic_flag* lock) {
+ while (lock->test_and_set(std::memory_order_acquire)) {}
+ },
+ [](std::atomic_flag* lock) {
+ lock->clear(std::memory_order_release);
+ }
+ );
+}
+
+
+// An overload of the 'synchronize' function for 'pthread_mutex_t'.
+inline Synchronized<pthread_mutex_t> synchronize(pthread_mutex_t* mutex)
+{
+ return Synchronized<pthread_mutex_t>(
+ mutex,
+ [](pthread_mutex_t* mutex) {
+ pthread_mutex_lock(mutex);
+ },
+ [](pthread_mutex_t* mutex) {
+ pthread_mutex_unlock(mutex);
+ }
+ );
+}
+
+
+template <typename T>
+T* synchronized_get_pointer(T** t)
+{
+ return *CHECK_NOTNULL(t);
+}
+
+
+template <typename T>
+T* synchronized_get_pointer(T* t)
+{
+ return t;
+}
+
+
+// Macros to help generate "unique" identifiers for the
+// synchronization variable name and label. The line number gets
+// embedded which makes it unique enough, but not absolutely unique.
+// It shouldn't be a problem however, since it's very unlikely that
+// anyone would place multiple 'synchronized' blocks on one line.
+#define SYNCHRONIZED_PREFIX CAT(__synchronizer_, __LINE__)
+#define SYNCHRONIZED_VAR CAT(SYNCHRONIZED_PREFIX, _var__)
+#define SYNCHRONIZED_LABEL CAT(SYNCHRONIZED_PREFIX, _label__)
// A macro for acquiring a scoped 'guard' on any type that can satisfy
-// the 'Synchronized' interface. Currently this includes 'std::mutex',
-// 'std::recursive_mutex' and 'std::atomic_flag'.
-// Example:
-// std::mutex m;
-// synchronized (m) {
-// // Do something under the lock.
-// }
-#define synchronized(m) \
- if (auto __ ## __file__ ## _ ## __line__ ## __lock = Synchronized<typename std::remove_pointer<decltype(m)>::type>(&m)) // NOLINT(whitespace/line_length)
+// the 'Synchronized' interface. We support 'std::mutex',
+// 'std::recursive_mutex' and 'std::atomic_flag' by default.
+//
+// Example usage:
+// std::mutex m;
+// synchronized (m) {
+// // Do something under the lock.
+// }
+//
+// You can easily extend support for your synchronization primitive
+// here, by overloading the 'synchronize' function.
+//
+// Example overload:
+// inline Synchronized<bufferevent> synchronize(bufferevent* bev)
+// {
+// return Synchronized<bufferevent>(
+// bev,
+// [](bufferevent* bev) { bufferevent_lock(bev); },
+// [](bufferevent* bev) { bufferevent_unlock(bev); },
+// );
+// }
+//
+// How it works: An instance of the synchronization primitive is
+// constructed inside the 'condition' of if statement. This variable
+// stays alive for the lifetime of the block. The trick with 'goto',
+// 'else' and the label allows the compiler to figure out that the
+// synchronized block will always execute. This allows us to return
+// within the synchronized block and avoid the
+// 'control reaches the end of a non-void function' warning.
+// Note that the variable declared inside the 'condition' of an if
+// statement is guaranteed to live through the 'else' clause as well.
+//
+// From Section 6.4.3:
+// A name introduced by a declaration in a condition (either
+// introduced by the decl-specifier-seq or the declarator of the
+// condition) is in scope from its point of declaration until the
+// end of the substatements controlled by the condition.
+#define synchronized(m) \
+ if (Synchronized<typename std::remove_pointer<decltype(m)>::type> \
+ SYNCHRONIZED_VAR = ::synchronize(::synchronized_get_pointer(&m))) { \
+ goto SYNCHRONIZED_LABEL; \
+ } else SYNCHRONIZED_LABEL:
#endif // __STOUT_SYNCHRONIZED_HPP__
[03/21] mesos git commit: Update libprocess Once to use synchronized.
Posted by be...@apache.org.
Update libprocess Once to use synchronized.
Review: https://reviews.apache.org/r/35093
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/149f42fd
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/149f42fd
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/149f42fd
Branch: refs/heads/master
Commit: 149f42fde28bf6eabb3fef0eea6eb86b85ca53e7
Parents: dc17126
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Jun 13 06:14:50 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Jun 14 02:43:00 2015 -0700
----------------------------------------------------------------------
3rdparty/libprocess/include/process/once.hpp | 9 +++------
1 file changed, 3 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/149f42fd/3rdparty/libprocess/include/process/once.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/once.hpp b/3rdparty/libprocess/include/process/once.hpp
index 256ed07..2b81df3 100644
--- a/3rdparty/libprocess/include/process/once.hpp
+++ b/3rdparty/libprocess/include/process/once.hpp
@@ -4,6 +4,7 @@
#include <process/future.hpp>
#include <stout/nothing.hpp>
+#include <stout/synchronized.hpp>
namespace process {
@@ -32,8 +33,7 @@ public:
{
bool result = false;
- pthread_mutex_lock(&mutex);
- {
+ synchronized (mutex) {
if (started) {
while (!finished) {
pthread_cond_wait(&cond, &mutex);
@@ -43,7 +43,6 @@ public:
started = true;
}
}
- pthread_mutex_unlock(&mutex);
return result;
}
@@ -51,14 +50,12 @@ public:
// Transitions this Once instance to a 'done' state.
void done()
{
- pthread_mutex_lock(&mutex);
- {
+ synchronized (mutex) {
if (started && !finished) {
finished = true;
pthread_cond_broadcast(&cond);
}
}
- pthread_mutex_unlock(&mutex);
}
private:
[07/21] mesos git commit: Rebase master_contender_detector_tests with
synchronized.
Posted by be...@apache.org.
Rebase master_contender_detector_tests with synchronized.
Review: https://reviews.apache.org/r/35092
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/dc17126a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/dc17126a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/dc17126a
Branch: refs/heads/master
Commit: dc17126a46f2e0db17ccdf84ce0d65683d7929a5
Parents: e1a13ad
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Jun 13 06:10:49 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Jun 14 02:43:00 2015 -0700
----------------------------------------------------------------------
src/tests/master_contender_detector_tests.cpp | 18 ++++++++++--------
1 file changed, 10 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/dc17126a/src/tests/master_contender_detector_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_contender_detector_tests.cpp b/src/tests/master_contender_detector_tests.cpp
index af6f15a..1b30eeb 100644
--- a/src/tests/master_contender_detector_tests.cpp
+++ b/src/tests/master_contender_detector_tests.cpp
@@ -282,14 +282,16 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, ContenderPendingElection)
process::TestsFilter* filter =
process::FilterTestEventListener::instance()->install();
- pthread_mutex_lock(&filter->mutex);
-
- // Expect GroupProcess::join not getting called because
- // ZooKeeperMasterContender directly returns.
- EXPECT_CALL(filter->mock, filter(testing::A<const process::DispatchEvent&>()))
- .With(DispatchMatcher(_, &GroupProcess::join))
- .Times(0);
- pthread_mutex_unlock(&filter->mutex);
+
+ synchronized (filter->mutex) {
+ // Expect GroupProcess::join not getting called because
+ // ZooKeeperMasterContender directly returns.
+ EXPECT_CALL(
+ filter->mock,
+ filter(testing::A<const process::DispatchEvent&>()))
+ .With(DispatchMatcher(_, &GroupProcess::join))
+ .Times(0);
+ }
// Recontend and settle so that if ZooKeeperMasterContender is not
// directly returning, GroupProcess::join is dispatched.
[20/21] mesos git commit: Update HookManager to use synchronized.
Posted by be...@apache.org.
Update HookManager to use synchronized.
Review: https://reviews.apache.org/r/35100
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/5a69aa74
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/5a69aa74
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/5a69aa74
Branch: refs/heads/master
Commit: 5a69aa74d5d464f53629d81b7200e67f818789b4
Parents: 51fca3f
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Jun 13 07:19:31 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Jun 14 02:43:01 2015 -0700
----------------------------------------------------------------------
src/hook/manager.cpp | 172 ++++++++++++++++++++++++----------------------
1 file changed, 91 insertions(+), 81 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/5a69aa74/src/hook/manager.cpp
----------------------------------------------------------------------
diff --git a/src/hook/manager.cpp b/src/hook/manager.cpp
index 54b0d34..5236035 100644
--- a/src/hook/manager.cpp
+++ b/src/hook/manager.cpp
@@ -16,8 +16,7 @@
* limitations under the License.
*/
-#include <pthread.h>
-
+#include <mutex>
#include <string>
#include <vector>
@@ -31,7 +30,6 @@
#include <stout/strings.hpp>
#include <stout/try.hpp>
-#include "common/lock.hpp"
#include "hook/manager.hpp"
#include "module/manager.hpp"
@@ -43,49 +41,52 @@ using mesos::modules::ModuleManager;
namespace mesos {
namespace internal {
-static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
+static std::mutex mutex;
static hashmap<string, Hook*> availableHooks;
Try<Nothing> HookManager::initialize(const string& hookList)
{
- Lock lock(&mutex);
-
- const vector<string> hooks = strings::split(hookList, ",");
- foreach (const string& hook, hooks) {
- if (availableHooks.contains(hook)) {
- return Error("Hook module '" + hook + "' already loaded");
- }
-
- if (!ModuleManager::contains<Hook>(hook)) {
- return Error("No hook module named '" + hook + "' available");
- }
-
- // Let's create an instance of the hook module.
- Try<Hook*> module = ModuleManager::create<Hook>(hook);
- if (module.isError()) {
- return Error(
- "Failed to instantiate hook module '" + hook + "': " +
- module.error());
+ synchronized (mutex) {
+ const vector<string> hooks = strings::split(hookList, ",");
+ foreach (const string& hook, hooks) {
+ if (availableHooks.contains(hook)) {
+ return Error("Hook module '" + hook + "' already loaded");
+ }
+
+ if (!ModuleManager::contains<Hook>(hook)) {
+ return Error("No hook module named '" + hook + "' available");
+ }
+
+ // Let's create an instance of the hook module.
+ Try<Hook*> module = ModuleManager::create<Hook>(hook);
+ if (module.isError()) {
+ return Error(
+ "Failed to instantiate hook module '" + hook + "': " +
+ module.error());
+ }
+
+ // Add the hook module to the list of available hooks.
+ availableHooks[hook] = module.get();
}
-
- // Add the hook module to the list of available hooks.
- availableHooks[hook] = module.get();
}
+
return Nothing();
}
Try<Nothing> HookManager::unload(const std::string& hookName)
{
- Lock lock(&mutex);
- if (!availableHooks.contains(hookName)) {
- return Error(
- "Error unloading hook module '" + hookName + "': module not loaded");
+ synchronized (mutex) {
+ if (!availableHooks.contains(hookName)) {
+ return Error(
+ "Error unloading hook module '" + hookName + "': module not loaded");
+ }
+
+ // Now remove the hook from the list of available hooks.
+ availableHooks.erase(hookName);
}
- // Now remove the hook from the list of available hooks.
- availableHooks.erase(hookName);
return Nothing();
}
@@ -95,28 +96,33 @@ Labels HookManager::masterLaunchTaskLabelDecorator(
const FrameworkInfo& frameworkInfo,
const SlaveInfo& slaveInfo)
{
- Lock lock(&mutex);
-
- // We need a mutable copy of the task info and set the new
- // labels after each hook invocation. Otherwise, the last hook
- // will be the only effective hook setting the labels.
- TaskInfo taskInfo_ = taskInfo;
-
- foreachpair (const string& name, Hook* hook, availableHooks) {
- const Result<Labels>& result =
- hook->masterLaunchTaskLabelDecorator(taskInfo_, frameworkInfo, slaveInfo);
-
- // NOTE: If the hook returns None(), the task labels won't be
- // changed.
- if (result.isSome()) {
- taskInfo_.mutable_labels()->CopyFrom(result.get());
- } else if (result.isError()) {
- LOG(WARNING) << "Master label decorator hook failed for module '"
- << name << "': " << result.error();
+ synchronized (mutex) {
+ // We need a mutable copy of the task info and set the new
+ // labels after each hook invocation. Otherwise, the last hook
+ // will be the only effective hook setting the labels.
+ TaskInfo taskInfo_ = taskInfo;
+
+ foreachpair (const string& name, Hook* hook, availableHooks) {
+ const Result<Labels>& result =
+ hook->masterLaunchTaskLabelDecorator(
+ taskInfo_,
+ frameworkInfo,
+ slaveInfo);
+
+ // NOTE: If the hook returns None(), the task labels won't be
+ // changed.
+ if (result.isSome()) {
+ taskInfo_.mutable_labels()->CopyFrom(result.get());
+ } else if (result.isError()) {
+ LOG(WARNING) << "Master label decorator hook failed for module '"
+ << name << "': " << result.error();
+ }
}
+
+ return taskInfo_.labels();
}
- return taskInfo_.labels();
+ UNREACHABLE();
}
@@ -125,49 +131,53 @@ Labels HookManager::slaveRunTaskLabelDecorator(
const FrameworkInfo& frameworkInfo,
const SlaveInfo& slaveInfo)
{
- Lock lock(&mutex);
-
- TaskInfo taskInfo_ = taskInfo;
-
- foreachpair (const string& name, Hook* hook, availableHooks) {
- const Result<Labels>& result =
- hook->slaveRunTaskLabelDecorator(taskInfo_, frameworkInfo, slaveInfo);
-
- // NOTE: If the hook returns None(), the task labels won't be
- // changed.
- if (result.isSome()) {
- taskInfo_.mutable_labels()->CopyFrom(result.get());
- } else if (result.isError()) {
- LOG(WARNING) << "Slave label decorator hook failed for module '"
- << name << "': " << result.error();
+ synchronized (mutex) {
+ TaskInfo taskInfo_ = taskInfo;
+
+ foreachpair (const string& name, Hook* hook, availableHooks) {
+ const Result<Labels>& result =
+ hook->slaveRunTaskLabelDecorator(taskInfo_, frameworkInfo, slaveInfo);
+
+ // NOTE: If the hook returns None(), the task labels won't be
+ // changed.
+ if (result.isSome()) {
+ taskInfo_.mutable_labels()->CopyFrom(result.get());
+ } else if (result.isError()) {
+ LOG(WARNING) << "Slave label decorator hook failed for module '"
+ << name << "': " << result.error();
+ }
}
+
+ return taskInfo_.labels();
}
- return taskInfo_.labels();
+ UNREACHABLE();
}
Environment HookManager::slaveExecutorEnvironmentDecorator(
ExecutorInfo executorInfo)
{
- Lock lock(&mutex);
-
- foreachpair (const string& name, Hook* hook, availableHooks) {
- const Result<Environment>& result =
- hook->slaveExecutorEnvironmentDecorator(executorInfo);
-
- // NOTE: If the hook returns None(), the environment won't be
- // changed.
- if (result.isSome()) {
- executorInfo.mutable_command()->mutable_environment()->CopyFrom(
- result.get());
- } else if (result.isError()) {
- LOG(WARNING) << "Slave environment decorator hook failed for module '"
- << name << "': " << result.error();
+ synchronized (mutex) {
+ foreachpair (const string& name, Hook* hook, availableHooks) {
+ const Result<Environment>& result =
+ hook->slaveExecutorEnvironmentDecorator(executorInfo);
+
+ // NOTE: If the hook returns None(), the environment won't be
+ // changed.
+ if (result.isSome()) {
+ executorInfo.mutable_command()->mutable_environment()->CopyFrom(
+ result.get());
+ } else if (result.isError()) {
+ LOG(WARNING) << "Slave environment decorator hook failed for module '"
+ << name << "': " << result.error();
+ }
}
+
+ return executorInfo.command().environment();
}
- return executorInfo.command().environment();
+ UNREACHABLE();
}
[17/21] mesos git commit: Remove src/common/lock. Use synchronized
instead.
Posted by be...@apache.org.
Remove src/common/lock. Use synchronized instead.
Review: https://reviews.apache.org/r/35102
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ad378345
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ad378345
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ad378345
Branch: refs/heads/master
Commit: ad378345f7fe12d4c091d46d44f2485b09fba40e
Parents: f4aaa14
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Jun 13 07:24:11 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Jun 14 02:43:01 2015 -0700
----------------------------------------------------------------------
src/Makefile.am | 2 --
src/common/lock.cpp | 55 ------------------------------------------------
src/common/lock.hpp | 45 ---------------------------------------
3 files changed, 102 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/ad378345/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 3c44f01..884533e 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -353,7 +353,6 @@ libmesos_no_3rdparty_la_SOURCES = \
common/attributes.cpp \
common/date_utils.cpp \
common/http.cpp \
- common/lock.cpp \
common/protobuf_utils.cpp \
common/resources.cpp \
common/resources_utils.cpp \
@@ -580,7 +579,6 @@ libmesos_no_3rdparty_la_SOURCES += \
common/date_utils.hpp \
common/factory.hpp \
common/http.hpp \
- common/lock.hpp \
common/parse.hpp \
common/protobuf_utils.hpp \
common/resources_utils.hpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/ad378345/src/common/lock.cpp
----------------------------------------------------------------------
diff --git a/src/common/lock.cpp b/src/common/lock.cpp
deleted file mode 100644
index bb8ea3a..0000000
--- a/src/common/lock.cpp
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "lock.hpp"
-
-namespace mesos {
-namespace internal {
-
-Lock::Lock(pthread_mutex_t* _mutex)
- : mutex(_mutex), locked(false)
-{
- lock();
-}
-
-
-void Lock::lock()
-{
- if (!locked) {
- pthread_mutex_lock(mutex);
- locked = true;
- }
-}
-
-
-void Lock::unlock()
-{
- if (locked) {
- locked = false;
- pthread_mutex_unlock(mutex);
- }
-}
-
-
-Lock::~Lock()
-{
- unlock();
-}
-
-} // namespace internal {
-} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/ad378345/src/common/lock.hpp
----------------------------------------------------------------------
diff --git a/src/common/lock.hpp b/src/common/lock.hpp
deleted file mode 100644
index 988dff5..0000000
--- a/src/common/lock.hpp
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef __LOCK_HPP__
-#define __LOCK_HPP__
-
-#include <pthread.h>
-
-namespace mesos {
-namespace internal {
-
-// RAII class for locking pthread_mutexes.
-class Lock
-{
-public:
- explicit Lock(pthread_mutex_t* _mutex);
- ~Lock();
-
- void lock();
- void unlock();
-
-private:
- pthread_mutex_t* mutex;
- bool locked;
-};
-
-} // namespace internal {
-} // namespace mesos {
-
-#endif // __LOCK_HPP__
[14/21] mesos git commit: Update cram_md5 to use synchronized.
Posted by be...@apache.org.
Update cram_md5 to use synchronized.
Review: https://reviews.apache.org/r/35099
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/51fca3f7
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/51fca3f7
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/51fca3f7
Branch: refs/heads/master
Commit: 51fca3f7efa0fd8abd11c22d87092b13f2ba0318
Parents: 98039c9
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Jun 13 07:18:56 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Jun 14 02:43:01 2015 -0700
----------------------------------------------------------------------
src/authentication/cram_md5/auxprop.cpp | 5 +++--
src/authentication/cram_md5/auxprop.hpp | 25 +++++++++++++------------
2 files changed, 16 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/51fca3f7/src/authentication/cram_md5/auxprop.cpp
----------------------------------------------------------------------
diff --git a/src/authentication/cram_md5/auxprop.cpp b/src/authentication/cram_md5/auxprop.cpp
index 984ed90..abf0f8d 100644
--- a/src/authentication/cram_md5/auxprop.cpp
+++ b/src/authentication/cram_md5/auxprop.cpp
@@ -18,6 +18,8 @@
#include "authentication/cram_md5/auxprop.hpp"
+#include <mutex>
+
#include "logging/logging.hpp"
using std::list;
@@ -30,8 +32,7 @@ namespace cram_md5 {
// Storage for the static members.
Multimap<string, Property> InMemoryAuxiliaryPropertyPlugin::properties;
sasl_auxprop_plug_t InMemoryAuxiliaryPropertyPlugin::plugin;
-pthread_mutex_t InMemoryAuxiliaryPropertyPlugin::mutex =
- PTHREAD_MUTEX_INITIALIZER;
+std::mutex InMemoryAuxiliaryPropertyPlugin::mutex;
int InMemoryAuxiliaryPropertyPlugin::initialize(
http://git-wip-us.apache.org/repos/asf/mesos/blob/51fca3f7/src/authentication/cram_md5/auxprop.hpp
----------------------------------------------------------------------
diff --git a/src/authentication/cram_md5/auxprop.hpp b/src/authentication/cram_md5/auxprop.hpp
index 1a054ba..0fa87f4 100644
--- a/src/authentication/cram_md5/auxprop.hpp
+++ b/src/authentication/cram_md5/auxprop.hpp
@@ -19,8 +19,7 @@
#ifndef __AUTHENTICATION_CRAM_MD5_AUXPROP_HPP__
#define __AUTHENTICATION_CRAM_MD5_AUXPROP_HPP__
-#include <pthread.h>
-
+#include <mutex>
#include <string>
#include <sasl/sasl.h>
@@ -30,8 +29,7 @@
#include <stout/multimap.hpp>
#include <stout/none.hpp>
#include <stout/option.hpp>
-
-#include "common/lock.hpp"
+#include <stout/synchronized.hpp>
namespace mesos {
namespace internal {
@@ -51,22 +49,25 @@ public:
static void load(const Multimap<std::string, Property>& _properties)
{
- Lock lock(&mutex);
- properties = _properties;
+ synchronized (mutex) {
+ properties = _properties;
+ }
}
static Option<std::list<std::string>> lookup(
const std::string& user,
const std::string& name)
{
- Lock lock(&mutex);
- if (properties.contains(user)) {
- foreach (const Property& property, properties.get(user)) {
- if (property.name == name) {
- return property.values;
+ synchronized (mutex) {
+ if (properties.contains(user)) {
+ foreach (const Property& property, properties.get(user)) {
+ if (property.name == name) {
+ return property.values;
+ }
}
}
}
+
return None();
}
@@ -98,7 +99,7 @@ private:
// Access to 'properties' has to be protected as multiple
// authenticator instances may be active concurrently.
- static pthread_mutex_t mutex;
+ static std::mutex mutex;
};
} // namespace cram_md5 {
[06/21] mesos git commit: libprocess: Rebased for _CheckFatal changes
in stout.
Posted by be...@apache.org.
libprocess: Rebased for _CheckFatal changes in stout.
Review: https://reviews.apache.org/r/35423
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c683b00b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c683b00b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c683b00b
Branch: refs/heads/master
Commit: c683b00bc7a01099f0ef614606b053e9b4fbb94f
Parents: 5f7c5a7
Author: Michael Park <mc...@gmail.com>
Authored: Sat Jun 13 05:25:03 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Jun 14 02:43:00 2015 -0700
----------------------------------------------------------------------
3rdparty/libprocess/include/process/check.hpp | 67 ++++++++++------------
1 file changed, 29 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/c683b00b/3rdparty/libprocess/include/process/check.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/check.hpp b/3rdparty/libprocess/include/process/check.hpp
index 60989ac..4b43b70 100644
--- a/3rdparty/libprocess/include/process/check.hpp
+++ b/3rdparty/libprocess/include/process/check.hpp
@@ -25,79 +25,69 @@
// This appends the error if possible to the end of the log message, so there's
// no need to append the error message explicitly.
#define CHECK_PENDING(expression) \
- for (const Option<std::string> _error = _checkPending(expression); \
- _error.isSome();) \
- _CheckFatal(__FILE__, __LINE__, "CHECK_PENDING", \
- #expression, _error.get()).stream()
+ CHECK_STATE(CHECK_PENDING, _check_pending, expression)
#define CHECK_READY(expression) \
- for (const Option<std::string> _error = _checkReady(expression); \
- _error.isSome();) \
- _CheckFatal(__FILE__, __LINE__, "CHECK_READY", \
- #expression, _error.get()).stream()
+ CHECK_STATE(CHECK_READY, _check_ready, expression)
#define CHECK_DISCARDED(expression) \
- for (const Option<std::string> _error = _checkDiscarded(expression); \
- _error.isSome();) \
- _CheckFatal(__FILE__, __LINE__, "CHECK_DISCARDED", \
- #expression, _error.get()).stream()
+ CHECK_STATE(CHECK_DISCARDED, _check_discarded, expression)
#define CHECK_FAILED(expression) \
- for (const Option<std::string> _error = _checkFailed(expression); \
- _error.isSome();) \
- _CheckFatal(__FILE__, __LINE__, "CHECK_FAILED", \
- #expression, _error.get()).stream()
-
+ CHECK_STATE(CHECK_FAILED, _check_failed, expression)
// Private structs/functions used for CHECK_*.
template <typename T>
-Option<std::string> _checkPending(const process::Future<T>& f)
+Option<Error> _check_pending(const process::Future<T>& f)
{
if (f.isReady()) {
- return Some("is READY");
+ return Error("is READY");
} else if (f.isDiscarded()) {
- return Some("is DISCARDED");
+ return Error("is DISCARDED");
} else if (f.isFailed()) {
- return Some("is FAILED: " + f.failure());
+ return Error("is FAILED: " + f.failure());
+ } else {
+ CHECK(f.isPending());
+ return None();
}
- CHECK(f.isPending());
- return None();
}
template <typename T>
-Option<std::string> _checkReady(const process::Future<T>& f)
+Option<Error> _check_ready(const process::Future<T>& f)
{
if (f.isPending()) {
- return Some("is PENDING");
+ return Error("is PENDING");
} else if (f.isDiscarded()) {
- return Some("is DISCARDED");
+ return Error("is DISCARDED");
} else if (f.isFailed()) {
- return Some("is FAILED: " + f.failure());
+ return Error("is FAILED: " + f.failure());
+ } else {
+ CHECK(f.isReady());
+ return None();
}
- CHECK(f.isReady());
- return None();
}
template <typename T>
-Option<std::string> _checkDiscarded(const process::Future<T>& f)
+Option<Error> _check_discarded(const process::Future<T>& f)
{
if (f.isPending()) {
- return Some("is PENDING");
+ return Error("is PENDING");
} else if (f.isReady()) {
- return Some("is READY");
+ return Error("is READY");
} else if (f.isFailed()) {
- return Some("is FAILED: " + f.failure());
+ return Error("is FAILED: " + f.failure());
+ } else {
+ CHECK(f.isDiscarded());
+ return None();
}
- CHECK(f.isDiscarded());
- return None();
}
template <typename T>
-Option<std::string> _checkFailed(const process::Future<T>& f)
+Option<Error> _check_failed(const process::Future<T>& f)
{
if (f.isPending()) {
return Some("is PENDING");
@@ -105,9 +95,10 @@ Option<std::string> _checkFailed(const process::Future<T>& f)
return Some("is READY");
} else if (f.isDiscarded()) {
return Some("is DISCARDED");
+ } else {
+ CHECK(f.isFailed());
+ return None();
}
- CHECK(f.isFailed());
- return None();
}
// TODO(dhamon): CHECK_NPENDING, CHECK_NREADY, etc.
[12/21] mesos git commit: libprocess: Use CHECK_NONE.
Posted by be...@apache.org.
libprocess: Use CHECK_NONE.
Used `grep -r "CHECK([^\!].*\.isNone())" .` to find the instances that
look like `CHECK(x.isNone());`
Review: https://reviews.apache.org/r/35425
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/98236481
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/98236481
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/98236481
Branch: refs/heads/master
Commit: 9823648125a5fe560f9c278ee0a1239d7a8eabfb
Parents: ad37834
Author: Michael Park <mc...@gmail.com>
Authored: Sat Jun 13 07:28:48 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Jun 14 02:43:01 2015 -0700
----------------------------------------------------------------------
3rdparty/libprocess/src/decoder.hpp | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/98236481/3rdparty/libprocess/src/decoder.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/decoder.hpp b/3rdparty/libprocess/src/decoder.hpp
index 56adde0..85ce9e3 100644
--- a/3rdparty/libprocess/src/decoder.hpp
+++ b/3rdparty/libprocess/src/decoder.hpp
@@ -555,7 +555,7 @@ private:
decoder->value.clear();
CHECK(decoder->response == NULL);
- CHECK(decoder->writer.isNone());
+ CHECK_NONE(decoder->writer);
decoder->response = new http::Response();
decoder->response->type = http::Response::PIPE;
@@ -642,7 +642,7 @@ private:
return 1;
}
- CHECK(decoder->writer.isNone());
+ CHECK_NONE(decoder->writer);
http::Pipe pipe;
decoder->writer = pipe.writer();
[21/21] mesos git commit: Update ModuleManager to use synchronized.
Posted by be...@apache.org.
Update ModuleManager to use synchronized.
Review: https://reviews.apache.org/r/35098
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/98039c99
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/98039c99
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/98039c99
Branch: refs/heads/master
Commit: 98039c99b0a2e0a36a7d4c22e672ecd817923bb4
Parents: 8939609
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Jun 13 07:15:59 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Jun 14 02:43:01 2015 -0700
----------------------------------------------------------------------
src/module/manager.cpp | 129 ++++++++++++++++++++++----------------------
src/module/manager.hpp | 71 ++++++++++++------------
2 files changed, 101 insertions(+), 99 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/98039c99/src/module/manager.cpp
----------------------------------------------------------------------
diff --git a/src/module/manager.cpp b/src/module/manager.cpp
index 2c7f876..909ca56 100644
--- a/src/module/manager.cpp
+++ b/src/module/manager.cpp
@@ -16,6 +16,7 @@
* limitations under the License.
*/
+#include <mutex>
#include <string>
#include <vector>
@@ -31,8 +32,6 @@
#include <stout/stringify.hpp>
#include <stout/version.hpp>
-#include "common/lock.hpp"
-
#include "messages/messages.hpp"
#include "module/manager.hpp"
@@ -46,7 +45,7 @@ using namespace mesos;
using namespace mesos::internal;
using namespace mesos::modules;
-pthread_mutex_t ModuleManager::mutex = PTHREAD_MUTEX_INITIALIZER;
+std::mutex ModuleManager::mutex;
hashmap<const string, string> ModuleManager::kindToVersion;
hashmap<const string, ModuleBase*> ModuleManager::moduleBases;
hashmap<const string, Owned<DynamicLibrary>> ModuleManager::dynamicLibraries;
@@ -104,15 +103,16 @@ void ModuleManager::initialize()
// of ModuleBases.
Try<Nothing> ModuleManager::unload(const string& moduleName)
{
- Lock lock(&mutex);
- if (!moduleBases.contains(moduleName)) {
- return Error(
- "Error unloading module '" + moduleName + "': module not loaded");
- }
+ synchronized (mutex) {
+ if (!moduleBases.contains(moduleName)) {
+ return Error(
+ "Error unloading module '" + moduleName + "': module not loaded");
+ }
- // Do not remove the dynamiclibrary as it could result in unloading
- // the library from the process memory.
- moduleBases.erase(moduleName);
+ // Do not remove the dynamiclibrary as it could result in
+ // unloading the library from the process memory.
+ moduleBases.erase(moduleName);
+ }
return Nothing();
}
@@ -189,64 +189,67 @@ Try<Nothing> ModuleManager::verifyModule(
Try<Nothing> ModuleManager::load(const Modules& modules)
{
- Lock lock(&mutex);
- initialize();
-
- foreach (const Modules::Library& library, modules.libraries()) {
- string libraryName;
- if (library.has_file()) {
- libraryName = library.file();
- } else if (library.has_name()) {
- libraryName = os::libraries::expandName(library.name());
- } else {
- return Error("Library name or path not provided");
- }
-
- if (!dynamicLibraries.contains(libraryName)) {
- Owned<DynamicLibrary> dynamicLibrary(new DynamicLibrary());
- Try<Nothing> result = dynamicLibrary->open(libraryName);
- if (!result.isSome()) {
- return Error(
- "Error opening library: '" + libraryName + "': " + result.error());
+ synchronized (mutex) {
+ initialize();
+
+ foreach (const Modules::Library& library, modules.libraries()) {
+ string libraryName;
+ if (library.has_file()) {
+ libraryName = library.file();
+ } else if (library.has_name()) {
+ libraryName = os::libraries::expandName(library.name());
+ } else {
+ return Error("Library name or path not provided");
}
- dynamicLibraries[libraryName] = dynamicLibrary;
- }
-
- // Load module manifests.
- foreach (const Modules::Library::Module& module, library.modules()) {
- if (!module.has_name()) {
- return Error(
- "Error: module name not provided with library '" + libraryName +
- "'");
- }
+ if (!dynamicLibraries.contains(libraryName)) {
+ Owned<DynamicLibrary> dynamicLibrary(new DynamicLibrary());
+ Try<Nothing> result = dynamicLibrary->open(libraryName);
+ if (!result.isSome()) {
+ return Error(
+ "Error opening library: '" + libraryName +
+ "': " + result.error());
+ }
- // Check for possible duplicate module names.
- const std::string moduleName = module.name();
- if (moduleBases.contains(moduleName)) {
- return Error("Error loading duplicate module '" + moduleName + "'");
+ dynamicLibraries[libraryName] = dynamicLibrary;
}
- // Load ModuleBase.
- Try<void*> symbol = dynamicLibraries[libraryName]->loadSymbol(moduleName);
- if (symbol.isError()) {
- return Error(
- "Error loading module '" + moduleName + "': " + symbol.error());
+ // Load module manifests.
+ foreach (const Modules::Library::Module& module, library.modules()) {
+ if (!module.has_name()) {
+ return Error(
+ "Error: module name not provided with library '" + libraryName +
+ "'");
+ }
+
+ // Check for possible duplicate module names.
+ const std::string moduleName = module.name();
+ if (moduleBases.contains(moduleName)) {
+ return Error("Error loading duplicate module '" + moduleName + "'");
+ }
+
+ // Load ModuleBase.
+ Try<void*> symbol =
+ dynamicLibraries[libraryName]->loadSymbol(moduleName);
+ if (symbol.isError()) {
+ return Error(
+ "Error loading module '" + moduleName + "': " + symbol.error());
+ }
+ ModuleBase* moduleBase = (ModuleBase*) symbol.get();
+
+ // Verify module compatibility including version, etc.
+ Try<Nothing> result = verifyModule(moduleName, moduleBase);
+ if (result.isError()) {
+ return Error(
+ "Error verifying module '" + moduleName + "': " + result.error());
+ }
+
+ moduleBases[moduleName] = (ModuleBase*) symbol.get();
+
+ // Now copy the supplied module-specific parameters.
+ moduleParameters[moduleName].mutable_parameter()->CopyFrom(
+ module.parameters());
}
- ModuleBase* moduleBase = (ModuleBase*) symbol.get();
-
- // Verify module compatibility including version, etc.
- Try<Nothing> result = verifyModule(moduleName, moduleBase);
- if (result.isError()) {
- return Error(
- "Error verifying module '" + moduleName + "': " + result.error());
- }
-
- moduleBases[moduleName] = (ModuleBase*) symbol.get();
-
- // Now copy the supplied module-specific parameters.
- moduleParameters[moduleName].mutable_parameter()->CopyFrom(
- module.parameters());
}
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/98039c99/src/module/manager.hpp
----------------------------------------------------------------------
diff --git a/src/module/manager.hpp b/src/module/manager.hpp
index 4befb64..cab67a8 100644
--- a/src/module/manager.hpp
+++ b/src/module/manager.hpp
@@ -19,9 +19,8 @@
#ifndef __MODULE_MANAGER_HPP__
#define __MODULE_MANAGER_HPP__
-#include <pthread.h>
-
#include <list>
+#include <mutex>
#include <string>
#include <vector>
@@ -37,8 +36,8 @@
#include <stout/check.hpp>
#include <stout/dynamiclibrary.hpp>
#include <stout/hashmap.hpp>
+#include <stout/synchronized.hpp>
-#include "common/lock.hpp"
#include "messages/messages.hpp"
namespace mesos {
@@ -71,40 +70,42 @@ public:
template <typename T>
static Try<T*> create(const std::string& moduleName)
{
- mesos::internal::Lock lock(&mutex);
- if (!moduleBases.contains(moduleName)) {
- return Error(
- "Module '" + moduleName + "' unknown");
- }
+ synchronized (mutex) {
+ if (!moduleBases.contains(moduleName)) {
+ return Error(
+ "Module '" + moduleName + "' unknown");
+ }
- Module<T>* module = (Module<T>*) moduleBases[moduleName];
- if (module->create == NULL) {
- return Error(
- "Error creating module instance for '" + moduleName + "': "
- "create() method not found");
- }
+ Module<T>* module = (Module<T>*) moduleBases[moduleName];
+ if (module->create == NULL) {
+ return Error(
+ "Error creating module instance for '" + moduleName + "': "
+ "create() method not found");
+ }
- std::string expectedKind = kind<T>();
- if (expectedKind != module->kind) {
- return Error(
- "Error creating module instance for '" + moduleName + "': "
- "module is of kind '" + module->kind + "', but the requested "
- "kind is '" + expectedKind + "'");
- }
+ std::string expectedKind = kind<T>();
+ if (expectedKind != module->kind) {
+ return Error(
+ "Error creating module instance for '" + moduleName + "': "
+ "module is of kind '" + module->kind + "', but the requested "
+ "kind is '" + expectedKind + "'");
+ }
- T* instance = module->create(moduleParameters[moduleName]);
- if (instance == NULL) {
- return Error("Error creating Module instance for '" + moduleName + "'");
+ T* instance = module->create(moduleParameters[moduleName]);
+ if (instance == NULL) {
+ return Error("Error creating Module instance for '" + moduleName + "'");
+ }
+ return instance;
}
- return instance;
}
template <typename T>
static bool contains(const std::string& moduleName)
{
- mesos::internal::Lock lock(&mutex);
- return (moduleBases.contains(moduleName) &&
- moduleBases[moduleName]->kind == stringify(kind<T>()));
+ synchronized (mutex) {
+ return (moduleBases.contains(moduleName) &&
+ moduleBases[moduleName]->kind == stringify(kind<T>()));
+ }
}
// Returns all module names that have been loaded that implement the
@@ -117,13 +118,13 @@ public:
template <typename T>
static std::vector<std::string> find()
{
- mesos::internal::Lock lock(&mutex);
-
std::vector<std::string> names;
- foreachpair (const std::string& name, ModuleBase* base, moduleBases) {
- if (base->kind == stringify(kind<T>())) {
- names.push_back(name);
+ synchronized (mutex) {
+ foreachpair (const std::string& name, ModuleBase* base, moduleBases) {
+ if (base->kind == stringify(kind<T>())) {
+ names.push_back(name);
+ }
}
}
@@ -141,9 +142,7 @@ private:
const std::string& moduleName,
const ModuleBase* moduleBase);
- // TODO(karya): Replace pthread_mutex_t with std::mutex in
- // common/lock.hpp and other places that refer to it.
- static pthread_mutex_t mutex;
+ static std::mutex mutex;
static hashmap<const std::string, std::string> kindToVersion;