You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2015/06/14 11:46:19 UTC

[01/21] mesos git commit: stout: Introduce CHECK_NONE and CHECK_ERROR.

Repository: mesos
Updated Branches:
  refs/heads/master 763364e86 -> 26527296f


stout: Introduce CHECK_NONE and CHECK_ERROR.

Review: https://reviews.apache.org/r/35422


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/5f7c5a7d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/5f7c5a7d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/5f7c5a7d

Branch: refs/heads/master
Commit: 5f7c5a7d95aa4065e85accf13149b4eb3d798e8c
Parents: 93b60ab
Author: Michael Park <mc...@gmail.com>
Authored: Sat Jun 13 05:24:24 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Jun 14 02:42:59 2015 -0700

----------------------------------------------------------------------
 .../3rdparty/stout/include/stout/check.hpp      | 115 ++++++++++++++++---
 1 file changed, 96 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/5f7c5a7d/3rdparty/libprocess/3rdparty/stout/include/stout/check.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/stout/include/stout/check.hpp b/3rdparty/libprocess/3rdparty/stout/include/stout/check.hpp
index 3972383..b550bfe 100644
--- a/3rdparty/libprocess/3rdparty/stout/include/stout/check.hpp
+++ b/3rdparty/libprocess/3rdparty/stout/include/stout/check.hpp
@@ -21,52 +21,129 @@
 #include <glog/logging.h>
 
 #include <stout/abort.hpp>
+#include <stout/error.hpp>
 #include <stout/none.hpp>
 #include <stout/option.hpp>
 #include <stout/result.hpp>
 #include <stout/some.hpp>
 #include <stout/try.hpp>
 
-// Provides a CHECK_SOME macro, akin to CHECK.
+// A generic macro to faciliate definitions of CHECK_*, akin to CHECK.
 // This appends the error if possible to the end of the log message,
 // so there's no need to append the error message explicitly.
-#define CHECK_SOME(expression)                                          \
-  for (const Option<std::string> _error = _check(expression);           \
-       _error.isSome();)                                                \
-    _CheckFatal(__FILE__, __LINE__, "CHECK_SOME",                       \
-                #expression, _error.get()).stream()
+// To define a new CHECK_*, provide the name, the function that performs the
+// check, and the expression. See below for examples (e.g. CHECK_SOME).
+#define CHECK_STATE(name, check, expression)                             \
+  for (const Option<Error> _error = check(expression); _error.isSome();) \
+    _CheckFatal(__FILE__,                                                \
+                __LINE__,                                                \
+                #name,                                                   \
+                #expression,                                             \
+                _error.get()).stream()
+
+
+#define CHECK_SOME(expression) \
+  CHECK_STATE(CHECK_SOME, _check_some, expression)
+
+
+#define CHECK_NONE(expression) \
+  CHECK_STATE(CHECK_NONE, _check_none, expression)
+
+
+#define CHECK_ERROR(expression) \
+  CHECK_STATE(CHECK_ERROR, _check_error, expression)
+
+
+// Private structs/functions used for CHECK_*.
 
-// Private structs/functions used for CHECK_SOME.
 
 template <typename T>
-Option<std::string> _check(const Option<T>& o)
+Option<Error> _check_some(const Option<T>& o)
 {
   if (o.isNone()) {
-    return Some("is NONE");
+    return Error("is NONE");
+  } else {
+    CHECK(o.isSome());
+    return None();
   }
-  return None();
 }
 
 
 template <typename T>
-Option<std::string> _check(const Try<T>& t)
+Option<Error> _check_some(const Try<T>& t)
 {
   if (t.isError()) {
-    return t.error();
+    return Error(t.error());
+  } else {
+    CHECK(t.isSome());
+    return None();
   }
-  return None();
 }
 
 
 template <typename T>
-Option<std::string> _check(const Result<T>& r)
+Option<Error> _check_some(const Result<T>& r)
 {
   if (r.isError()) {
-    return r.error();
+    return Error(r.error());
   } else if (r.isNone()) {
-    return Some("is NONE");
+    return Error("is NONE");
+  } else {
+    CHECK(r.isSome());
+    return None();
+  }
+}
+
+
+template <typename T>
+Option<Error> _check_none(const Option<T>& o)
+{
+  if (o.isSome()) {
+    return Error("is SOME");
+  } else {
+    CHECK(o.isNone());
+    return None();
+  }
+}
+
+
+template <typename T>
+Option<Error> _check_none(const Result<T>& r)
+{
+  if (r.isError()) {
+    return Error("is ERROR");
+  } else if (r.isSome()) {
+    return Error("is SOME");
+  } else {
+    CHECK(r.isNone());
+    return None();
+  }
+}
+
+
+template <typename T>
+Option<Error> _check_error(const Try<T>& t)
+{
+  if (t.isSome()) {
+    return Error("is SOME");
+  } else {
+    CHECK(t.isError());
+    return None();
+  }
+}
+
+
+template <typename T>
+Option<Error> _check_error(const Result<T>& r)
+{
+  if (r.isNone()) {
+    return Error("is NONE");
+  } else if (r.isSome()) {
+    return Error("is SOME");
+  } else {
+    CHECK(r.isError());
+    return None();
   }
-  return None();
 }
 
 
@@ -76,11 +153,11 @@ struct _CheckFatal
               int _line,
               const char* type,
               const char* expression,
-              const std::string& error)
+              const Error& error)
       : file(_file),
         line(_line)
   {
-    out << type << "(" << expression << "): " << error << " ";
+    out << type << "(" << expression << "): " << error.message << " ";
   }
 
   ~_CheckFatal()


[09/21] mesos git commit: Replace std::lock_guard with synchronized in fetcher_cache_test.

Posted by be...@apache.org.
Replace std::lock_guard with synchronized in fetcher_cache_test.

Review: https://reviews.apache.org/r/35089


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9e7f64a8
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9e7f64a8
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9e7f64a8

Branch: refs/heads/master
Commit: 9e7f64a8c5c2d12fb6ca5366aec2ebc826ad8fa1
Parents: eb8f88d
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Jun 13 05:55:56 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Jun 14 02:43:00 2015 -0700

----------------------------------------------------------------------
 src/tests/fetcher_cache_tests.cpp | 20 +++++++++-----------
 1 file changed, 9 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/9e7f64a8/src/tests/fetcher_cache_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fetcher_cache_tests.cpp b/src/tests/fetcher_cache_tests.cpp
index d6d768d..8bd5dd8 100644
--- a/src/tests/fetcher_cache_tests.cpp
+++ b/src/tests/fetcher_cache_tests.cpp
@@ -44,8 +44,6 @@
 #include <stout/os.hpp>
 #include <stout/try.hpp>
 
-#include "common/lock.hpp"
-
 #include "master/flags.hpp"
 #include "master/master.hpp"
 
@@ -767,19 +765,19 @@ public:
       // this function via a Queue. This is currently non-trivial
       // because we can't copy an HttpEvent so we're _forced_ to block
       // the thread synchronously.
-      std::lock_guard<std::mutex> lock(mutex);
+      synchronized (mutex) {
+        countRequests++;
 
-      countRequests++;
+        if (strings::contains(event.request->path, COMMAND_NAME)) {
+          countCommandRequests++;
+        }
 
-      if (strings::contains(event.request->path, COMMAND_NAME)) {
-        countCommandRequests++;
-      }
+        if (strings::contains(event.request->path, ARCHIVE_NAME)) {
+          countArchiveRequests++;
+        }
 
-      if (strings::contains(event.request->path, ARCHIVE_NAME)) {
-        countArchiveRequests++;
+        ProcessBase::visit(event);
       }
-
-      ProcessBase::visit(event);
     }
 
     void resetCounts()


[16/21] mesos git commit: Update ZooKeeper tests to use synchronized.

Posted by be...@apache.org.
Update ZooKeeper tests to use synchronized.

Review: https://reviews.apache.org/r/35101


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f4aaa143
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f4aaa143
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f4aaa143

Branch: refs/heads/master
Commit: f4aaa14392298ca5cbde260560ee32e73f9f2328
Parents: 5a69aa7
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Jun 13 07:20:47 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Jun 14 02:43:01 2015 -0700

----------------------------------------------------------------------
 src/hook/manager.cpp    |  6 ------
 src/tests/zookeeper.cpp | 25 +++++++++++++------------
 src/tests/zookeeper.hpp |  1 -
 3 files changed, 13 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/f4aaa143/src/hook/manager.cpp
----------------------------------------------------------------------
diff --git a/src/hook/manager.cpp b/src/hook/manager.cpp
index 5236035..b43b918 100644
--- a/src/hook/manager.cpp
+++ b/src/hook/manager.cpp
@@ -121,8 +121,6 @@ Labels HookManager::masterLaunchTaskLabelDecorator(
 
     return taskInfo_.labels();
   }
-
-  UNREACHABLE();
 }
 
 
@@ -150,8 +148,6 @@ Labels HookManager::slaveRunTaskLabelDecorator(
 
     return taskInfo_.labels();
   }
-
-  UNREACHABLE();
 }
 
 
@@ -176,8 +172,6 @@ Environment HookManager::slaveExecutorEnvironmentDecorator(
 
     return executorInfo.command().environment();
   }
-
-  UNREACHABLE();
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/f4aaa143/src/tests/zookeeper.cpp
----------------------------------------------------------------------
diff --git a/src/tests/zookeeper.cpp b/src/tests/zookeeper.cpp
index 08cab86..5012017 100644
--- a/src/tests/zookeeper.cpp
+++ b/src/tests/zookeeper.cpp
@@ -33,8 +33,7 @@
 #include <stout/lambda.hpp>
 #include <stout/path.hpp>
 #include <stout/os.hpp>
-
-#include "common/lock.hpp"
+#include <stout/synchronized.hpp>
 
 #include "logging/logging.hpp"
 
@@ -121,9 +120,10 @@ void ZooKeeperTest::TestWatcher::process(
     int64_t sessionId,
     const string& path)
 {
-  Lock lock(&mutex);
-  events.push(Event(type, state, path));
-  pthread_cond_signal(&cond);
+  synchronized (mutex) {
+    events.push(Event(type, state, path));
+    pthread_cond_signal(&cond);
+  }
 }
 
 
@@ -158,14 +158,15 @@ void ZooKeeperTest::TestWatcher::awaitCreated(const string& path)
 ZooKeeperTest::TestWatcher::Event
 ZooKeeperTest::TestWatcher::awaitEvent()
 {
-  Lock lock(&mutex);
-  while (true) {
-    while (events.empty()) {
-      pthread_cond_wait(&cond, &mutex);
+  synchronized (mutex) {
+    while (true) {
+      while (events.empty()) {
+        pthread_cond_wait(&cond, &mutex);
+      }
+      Event event = events.front();
+      events.pop();
+      return event;
     }
-    Event event = events.front();
-    events.pop();
-    return event;
   }
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/f4aaa143/src/tests/zookeeper.hpp
----------------------------------------------------------------------
diff --git a/src/tests/zookeeper.hpp b/src/tests/zookeeper.hpp
index d8f1cb3..32fc83b 100644
--- a/src/tests/zookeeper.hpp
+++ b/src/tests/zookeeper.hpp
@@ -19,7 +19,6 @@
 #ifndef __TESTS_ZOOKEEPER_HPP__
 #define __TESTS_ZOOKEEPER_HPP__
 
-#include <pthread.h>
 #include <stdint.h>
 
 #include <gtest/gtest.h>


[15/21] mesos git commit: Update linux fs to use synchronized.

Posted by be...@apache.org.
Update linux fs to use synchronized.

Review: https://reviews.apache.org/r/35095


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/5b0eeb0b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/5b0eeb0b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/5b0eeb0b

Branch: refs/heads/master
Commit: 5b0eeb0b761343e61de5593fba062ee3229dad1a
Parents: eb33a57
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Jun 13 07:05:33 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Jun 14 02:43:01 2015 -0700

----------------------------------------------------------------------
 src/linux/fs.cpp | 14 +++++---------
 1 file changed, 5 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/5b0eeb0b/src/linux/fs.cpp
----------------------------------------------------------------------
diff --git a/src/linux/fs.cpp b/src/linux/fs.cpp
index 1c9cf3f..568565f 100644
--- a/src/linux/fs.cpp
+++ b/src/linux/fs.cpp
@@ -26,12 +26,11 @@
 #include <stout/numify.hpp>
 #include <stout/path.hpp>
 #include <stout/strings.hpp>
+#include <stout/synchronized.hpp>
 
 #include <stout/os/read.hpp>
 #include <stout/os/stat.hpp>
 
-#include "common/lock.hpp"
-
 #include "linux/fs.hpp"
 
 using std::string;
@@ -188,10 +187,9 @@ Try<MountTable> MountTable::read(const string& path)
     // Mutex for guarding calls into non-reentrant mount table
     // functions. We use a static local variable to avoid unused
     // variable warnings.
-    static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
+    static std::mutex mutex;
 
-    {
-      Lock lock(&mutex);
+    synchronized (mutex) {
       struct mntent* mntent = ::getmntent(file);
       if (mntent == NULL) {
         // NULL means the end of enties.
@@ -219,14 +217,12 @@ Try<FileSystemTable> FileSystemTable::read()
 {
   // Mutex for guarding calls into non-reentrant fstab functions. We
   // use a static local variable to avoid unused variable warnings.
-  static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
+  static std::mutex mutex;
 
   FileSystemTable table;
 
   // Use locks since fstab functions are not thread-safe.
-  {
-    Lock lock(&mutex);
-
+  synchronized (mutex) {
     // Open file _PATH_FSTAB (/etc/fstab).
     if (::setfsent() == 0) {
       return Error("Failed to open file system table");


[13/21] mesos git commit: Update Mesos scheduler to use synchronized.

Posted by be...@apache.org.
Update Mesos scheduler to use synchronized.

Review: https://reviews.apache.org/r/35096


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b2d80474
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b2d80474
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b2d80474

Branch: refs/heads/master
Commit: b2d8047428228cbbea65f4af889d11e8918e2e96
Parents: 5b0eeb0
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Jun 13 07:06:22 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Jun 14 02:43:01 2015 -0700

----------------------------------------------------------------------
 include/mesos/scheduler.hpp |   5 +-
 src/sched/sched.cpp         | 382 +++++++++++++++++++--------------------
 2 files changed, 193 insertions(+), 194 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/b2d80474/include/mesos/scheduler.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/scheduler.hpp b/include/mesos/scheduler.hpp
index 2ee6b5c..0b54ffe 100644
--- a/include/mesos/scheduler.hpp
+++ b/include/mesos/scheduler.hpp
@@ -19,11 +19,10 @@
 #ifndef __MESOS_SCHEDULER_HPP__
 #define __MESOS_SCHEDULER_HPP__
 
-#include <functional>
-#include <queue>
-
 #include <pthread.h>
 
+#include <functional>
+#include <queue>
 #include <string>
 #include <vector>
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/b2d80474/src/sched/sched.cpp
----------------------------------------------------------------------
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index 9423607..bc76c71 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -71,8 +71,6 @@
 
 #include "authentication/cram_md5/authenticatee.hpp"
 
-#include "common/lock.hpp"
-
 #include "local/flags.hpp"
 #include "local/local.hpp"
 
@@ -841,8 +839,9 @@ protected:
       send(master.get(), message);
     }
 
-    Lock lock(mutex);
-    pthread_cond_signal(cond);
+    synchronized (mutex) {
+      pthread_cond_signal(cond);
+    }
   }
 
   // NOTE: This function informs the master to stop attempting to send
@@ -866,8 +865,9 @@ protected:
       send(master.get(), message);
     }
 
-    Lock lock(mutex);
-    pthread_cond_signal(cond);
+    synchronized (mutex) {
+      pthread_cond_signal(cond);
+    }
   }
 
   void killTask(const TaskID& taskId)
@@ -1507,156 +1507,156 @@ MesosSchedulerDriver::~MesosSchedulerDriver()
 
 Status MesosSchedulerDriver::start()
 {
-  Lock lock(&mutex);
-
-  if (status != DRIVER_NOT_STARTED) {
-    return status;
-  }
-
-  if (detector == NULL) {
-    Try<MasterDetector*> detector_ = MasterDetector::create(url);
-
-    if (detector_.isError()) {
-      status = DRIVER_ABORTED;
-      string message = "Failed to create a master detector for '" +
-      master + "': " + detector_.error();
-      scheduler->error(this, message);
+  synchronized (mutex) {
+    if (status != DRIVER_NOT_STARTED) {
       return status;
     }
 
-    // Save the detector so we can delete it later.
-    detector = detector_.get();
-  }
+    if (detector == NULL) {
+      Try<MasterDetector*> detector_ = MasterDetector::create(url);
 
-  // Load scheduler flags.
-  internal::scheduler::Flags flags;
-  Try<Nothing> load = flags.load("MESOS_");
+      if (detector_.isError()) {
+        status = DRIVER_ABORTED;
+        string message = "Failed to create a master detector for '" +
+        master + "': " + detector_.error();
+        scheduler->error(this, message);
+        return status;
+      }
 
-  if (load.isError()) {
-    status = DRIVER_ABORTED;
-    scheduler->error(this, load.error());
-    return status;
-  }
+      // Save the detector so we can delete it later.
+      detector = detector_.get();
+    }
 
-  // Initialize modules. Note that since other subsystems may depend
-  // upon modules, we should initialize modules before anything else.
-  if (flags.modules.isSome()) {
-    Try<Nothing> result = modules::ModuleManager::load(flags.modules.get());
-    if (result.isError()) {
+    // Load scheduler flags.
+    internal::scheduler::Flags flags;
+    Try<Nothing> load = flags.load("MESOS_");
+
+    if (load.isError()) {
       status = DRIVER_ABORTED;
-      scheduler->error(this, "Error loading modules: " + result.error());
+      scheduler->error(this, load.error());
       return status;
     }
-  }
 
-  CHECK(process == NULL);
+    // Initialize modules. Note that since other subsystems may depend
+    // upon modules, we should initialize modules before anything else.
+    if (flags.modules.isSome()) {
+      Try<Nothing> result = modules::ModuleManager::load(flags.modules.get());
+      if (result.isError()) {
+        status = DRIVER_ABORTED;
+        scheduler->error(this, "Error loading modules: " + result.error());
+        return status;
+      }
+    }
 
-  if (credential == NULL) {
-    process = new SchedulerProcess(
-        this,
-        scheduler,
-        framework,
-        None(),
-        implicitAcknowlegements,
-        schedulerId,
-        detector,
-        flags,
-        &mutex,
-        &cond);
-  } else {
-    const Credential& cred = *credential;
-    process = new SchedulerProcess(
-        this,
-        scheduler,
-        framework,
-        cred,
-        implicitAcknowlegements,
-        schedulerId,
-        detector,
-        flags,
-        &mutex,
-        &cond);
+    CHECK(process == NULL);
+
+    if (credential == NULL) {
+      process = new SchedulerProcess(
+          this,
+          scheduler,
+          framework,
+          None(),
+          implicitAcknowlegements,
+          schedulerId,
+          detector,
+          flags,
+          &mutex,
+          &cond);
+    } else {
+      const Credential& cred = *credential;
+      process = new SchedulerProcess(
+          this,
+          scheduler,
+          framework,
+          cred,
+          implicitAcknowlegements,
+          schedulerId,
+          detector,
+          flags,
+          &mutex,
+          &cond);
+    }
+
+    spawn(process);
+
+    return status = DRIVER_RUNNING;
   }
-
-  spawn(process);
-
-  return status = DRIVER_RUNNING;
 }
 
 
 Status MesosSchedulerDriver::stop(bool failover)
 {
-  Lock lock(&mutex);
-
-  LOG(INFO) << "Asked to stop the driver";
+  synchronized (mutex) {
+    LOG(INFO) << "Asked to stop the driver";
 
-  if (status != DRIVER_RUNNING && status != DRIVER_ABORTED) {
-    VLOG(1) << "Ignoring stop because the status of the driver is "
-            << Status_Name(status);
-    return status;
-  }
+    if (status != DRIVER_RUNNING && status != DRIVER_ABORTED) {
+      VLOG(1) << "Ignoring stop because the status of the driver is "
+              << Status_Name(status);
+      return status;
+    }
 
-  // 'process' might be NULL if the driver has failed to instantiate
-  // it due to bad parameters (e.g. error in creating the detector
-  // or loading flags).
-  if (process != NULL) {
-    process->running =  false;
-    dispatch(process, &SchedulerProcess::stop, failover);
-  }
+    // 'process' might be NULL if the driver has failed to instantiate
+    // it due to bad parameters (e.g. error in creating the detector
+    // or loading flags).
+    if (process != NULL) {
+      process->running =  false;
+      dispatch(process, &SchedulerProcess::stop, failover);
+    }
 
-  // TODO(benh): It might make more sense to clean up our local
-  // cluster here than in the destructor. However, what would be even
-  // better is to allow multiple local clusters to exist (i.e. not use
-  // global vars in local.cpp) so that ours can just be an instance
-  // variable in MesosSchedulerDriver.
+    // TODO(benh): It might make more sense to clean up our local
+    // cluster here than in the destructor. However, what would be
+    // even better is to allow multiple local clusters to exist (i.e.
+    // not use global vars in local.cpp) so that ours can just be an
+    // instance variable in MesosSchedulerDriver.
 
-  bool aborted = status == DRIVER_ABORTED;
+    bool aborted = status == DRIVER_ABORTED;
 
-  status = DRIVER_STOPPED;
+    status = DRIVER_STOPPED;
 
-  return aborted ? DRIVER_ABORTED : status;
+    return aborted ? DRIVER_ABORTED : status;
+  }
 }
 
 
 Status MesosSchedulerDriver::abort()
 {
-  Lock lock(&mutex);
-
-  LOG(INFO) << "Asked to abort the driver";
+  synchronized (mutex) {
+    LOG(INFO) << "Asked to abort the driver";
 
-  if (status != DRIVER_RUNNING) {
-    VLOG(1) << "Ignoring abort because the status of the driver is "
-            << Status_Name(status);
-    return status;
-  }
+    if (status != DRIVER_RUNNING) {
+      VLOG(1) << "Ignoring abort because the status of the driver is "
+              << Status_Name(status);
+      return status;
+    }
 
-  CHECK_NOTNULL(process);
-  process->running = false;
+    CHECK_NOTNULL(process);
+    process->running = false;
 
-  // Dispatching here ensures that we still process the outstanding
-  // requests *from* the scheduler, since those do proceed when
-  // aborted is true.
-  dispatch(process, &SchedulerProcess::abort);
+    // Dispatching here ensures that we still process the outstanding
+    // requests *from* the scheduler, since those do proceed when
+    // aborted is true.
+    dispatch(process, &SchedulerProcess::abort);
 
-  return status = DRIVER_ABORTED;
+    return status = DRIVER_ABORTED;
+  }
 }
 
 
 Status MesosSchedulerDriver::join()
 {
-  Lock lock(&mutex);
-
-  if (status != DRIVER_RUNNING) {
-    return status;
-  }
+  synchronized (mutex) {
+    if (status != DRIVER_RUNNING) {
+      return status;
+    }
 
-  while (status == DRIVER_RUNNING) {
-    pthread_cond_wait(&cond, &mutex);
-  }
+    while (status == DRIVER_RUNNING) {
+      pthread_cond_wait(&cond, &mutex);
+    }
 
-  CHECK(status == DRIVER_ABORTED || status == DRIVER_STOPPED);
+    CHECK(status == DRIVER_ABORTED || status == DRIVER_STOPPED);
 
-  return status;
+    return status;
+  }
 }
 
 
@@ -1669,17 +1669,17 @@ Status MesosSchedulerDriver::run()
 
 Status MesosSchedulerDriver::killTask(const TaskID& taskId)
 {
-  Lock lock(&mutex);
-
-  if (status != DRIVER_RUNNING) {
-    return status;
-  }
+  synchronized (mutex) {
+    if (status != DRIVER_RUNNING) {
+      return status;
+    }
 
-  CHECK(process != NULL);
+    CHECK(process != NULL);
 
-  dispatch(process, &SchedulerProcess::killTask, taskId);
+    dispatch(process, &SchedulerProcess::killTask, taskId);
 
-  return status;
+    return status;
+  }
 }
 
 
@@ -1700,17 +1700,17 @@ Status MesosSchedulerDriver::launchTasks(
     const vector<TaskInfo>& tasks,
     const Filters& filters)
 {
-  Lock lock(&mutex);
-
-  if (status != DRIVER_RUNNING) {
-    return status;
-  }
+  synchronized (mutex) {
+    if (status != DRIVER_RUNNING) {
+      return status;
+    }
 
-  CHECK(process != NULL);
+    CHECK(process != NULL);
 
-  dispatch(process, &SchedulerProcess::launchTasks, offerIds, tasks, filters);
+    dispatch(process, &SchedulerProcess::launchTasks, offerIds, tasks, filters);
 
-  return status;
+    return status;
+  }
 }
 
 
@@ -1719,22 +1719,22 @@ Status MesosSchedulerDriver::acceptOffers(
     const vector<Offer::Operation>& operations,
     const Filters& filters)
 {
-  Lock lock(&mutex);
-
-  if (status != DRIVER_RUNNING) {
-    return status;
-  }
+  synchronized (mutex) {
+    if (status != DRIVER_RUNNING) {
+      return status;
+    }
 
-  CHECK(process != NULL);
+    CHECK(process != NULL);
 
-  dispatch(
-      process,
-      &SchedulerProcess::acceptOffers,
-      offerIds,
-      operations,
-      filters);
+    dispatch(
+        process,
+        &SchedulerProcess::acceptOffers,
+        offerIds,
+        operations,
+        filters);
 
-  return status;
+    return status;
+  }
 }
 
 
@@ -1751,40 +1751,40 @@ Status MesosSchedulerDriver::declineOffer(
 
 Status MesosSchedulerDriver::reviveOffers()
 {
-  Lock lock(&mutex);
-
-  if (status != DRIVER_RUNNING) {
-    return status;
-  }
+  synchronized (mutex) {
+    if (status != DRIVER_RUNNING) {
+      return status;
+    }
 
-  CHECK(process != NULL);
+    CHECK(process != NULL);
 
-  dispatch(process, &SchedulerProcess::reviveOffers);
+    dispatch(process, &SchedulerProcess::reviveOffers);
 
-  return status;
+    return status;
+  }
 }
 
 
 Status MesosSchedulerDriver::acknowledgeStatusUpdate(
     const TaskStatus& taskStatus)
 {
-  Lock lock(&mutex);
-
-  if (status != DRIVER_RUNNING) {
-    return status;
-  }
+  synchronized (mutex) {
+    if (status != DRIVER_RUNNING) {
+      return status;
+    }
 
-  // TODO(bmahler): Should this use abort() instead?
-  if (implicitAcknowlegements) {
-    ABORT("Cannot call acknowledgeStatusUpdate:"
-          " Implicit acknowledgements are enabled");
-  }
+    // TODO(bmahler): Should this use abort() instead?
+    if (implicitAcknowlegements) {
+      ABORT("Cannot call acknowledgeStatusUpdate:"
+            " Implicit acknowledgements are enabled");
+    }
 
-  CHECK(process != NULL);
+    CHECK(process != NULL);
 
-  dispatch(process, &SchedulerProcess::acknowledgeStatusUpdate, taskStatus);
+    dispatch(process, &SchedulerProcess::acknowledgeStatusUpdate, taskStatus);
 
-  return status;
+    return status;
+  }
 }
 
 
@@ -1793,50 +1793,50 @@ Status MesosSchedulerDriver::sendFrameworkMessage(
     const SlaveID& slaveId,
     const string& data)
 {
-  Lock lock(&mutex);
-
-  if (status != DRIVER_RUNNING) {
-    return status;
-  }
+  synchronized (mutex) {
+    if (status != DRIVER_RUNNING) {
+      return status;
+    }
 
-  CHECK(process != NULL);
+    CHECK(process != NULL);
 
-  dispatch(process, &SchedulerProcess::sendFrameworkMessage,
-           executorId, slaveId, data);
+    dispatch(process, &SchedulerProcess::sendFrameworkMessage,
+            executorId, slaveId, data);
 
-  return status;
+    return status;
+  }
 }
 
 
 Status MesosSchedulerDriver::reconcileTasks(
     const vector<TaskStatus>& statuses)
 {
-  Lock lock(&mutex);
-
-  if (status != DRIVER_RUNNING) {
-    return status;
-  }
+  synchronized (mutex) {
+    if (status != DRIVER_RUNNING) {
+      return status;
+    }
 
-  CHECK(process != NULL);
+    CHECK(process != NULL);
 
-  dispatch(process, &SchedulerProcess::reconcileTasks, statuses);
+    dispatch(process, &SchedulerProcess::reconcileTasks, statuses);
 
-  return status;
+    return status;
+  }
 }
 
 
 Status MesosSchedulerDriver::requestResources(
     const vector<Request>& requests)
 {
-  Lock lock(&mutex);
-
-  if (status != DRIVER_RUNNING) {
-    return status;
-  }
+  synchronized (mutex) {
+    if (status != DRIVER_RUNNING) {
+      return status;
+    }
 
-  CHECK(process != NULL);
+    CHECK(process != NULL);
 
-  dispatch(process, &SchedulerProcess::requestResources, requests);
+    dispatch(process, &SchedulerProcess::requestResources, requests);
 
-  return status;
+    return status;
+  }
 }


[02/21] mesos git commit: Forward into _Some constructor to elide copy.

Posted by be...@apache.org.
Forward into _Some constructor to elide copy.

Review: https://reviews.apache.org/r/34517


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/93b60abb
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/93b60abb
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/93b60abb

Branch: refs/heads/master
Commit: 93b60abb4b470a644fd0a8d51708d8fddf11b1a7
Parents: 763364e
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Jun 13 04:52:16 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Jun 14 02:42:59 2015 -0700

----------------------------------------------------------------------
 .../libprocess/3rdparty/stout/include/stout/some.hpp     | 11 +++++++----
 1 file changed, 7 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/93b60abb/3rdparty/libprocess/3rdparty/stout/include/stout/some.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/stout/include/stout/some.hpp b/3rdparty/libprocess/3rdparty/stout/include/stout/some.hpp
index 1a71ac4..7c5515c 100644
--- a/3rdparty/libprocess/3rdparty/stout/include/stout/some.hpp
+++ b/3rdparty/libprocess/3rdparty/stout/include/stout/some.hpp
@@ -14,6 +14,9 @@
 #ifndef __STOUT_SOME_HPP__
 #define __STOUT_SOME_HPP__
 
+#include <type_traits>
+#include <utility>
+
 // A useful type that can be used to represent an Option or Result.
 //
 // Examples:
@@ -30,16 +33,16 @@
 template <typename T>
 struct _Some
 {
-  _Some(T _t) : t(_t) {}
+  _Some(T _t) : t(std::move(_t)) {}
 
-  const T t;
+  T t;
 };
 
 
 template <typename T>
-_Some<T> Some(T t)
+_Some<typename std::decay<T>::type> Some(T&& t)
 {
-  return _Some<T>(t);
+  return _Some<typename std::decay<T>::type>(std::forward<T>(t));
 }
 
 #endif // __STOUT_SOME_HPP__


[04/21] mesos git commit: Update libprocess gmock to use synchronized.

Posted by be...@apache.org.
Update libprocess gmock to use synchronized.

Review: https://reviews.apache.org/r/35091


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e1a13ad8
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e1a13ad8
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e1a13ad8

Branch: refs/heads/master
Commit: e1a13ad88cae3b896194a92a9766a430ba08286f
Parents: 9cb1283
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Jun 13 06:03:40 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Jun 14 02:43:00 2015 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/gmock.hpp | 94 +++++++++++-----------
 1 file changed, 45 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/e1a13ad8/3rdparty/libprocess/include/process/gmock.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/gmock.hpp b/3rdparty/libprocess/include/process/gmock.hpp
index 6adc034..0fd3f8c 100644
--- a/3rdparty/libprocess/include/process/gmock.hpp
+++ b/3rdparty/libprocess/include/process/gmock.hpp
@@ -1,8 +1,6 @@
 #ifndef __PROCESS_GMOCK_HPP__
 #define __PROCESS_GMOCK_HPP__
 
-#include <pthread.h>
-
 #include <gmock/gmock.h>
 
 #include <process/dispatch.hpp>
@@ -12,6 +10,7 @@
 
 #include <stout/exit.hpp>
 #include <stout/nothing.hpp>
+#include <stout/synchronized.hpp>
 
 // NOTE: The gmock library relies on std::tr1::tuple. The gmock
 // library provides multiple possible 'tuple' implementations but it
@@ -222,19 +221,7 @@ public:
 class TestsFilter : public Filter
 {
 public:
-  TestsFilter()
-  {
-    // We use a recursive mutex here in the event that satisfying the
-    // future created in FutureMessage or FutureDispatch via the
-    // FutureArgField or FutureSatisfy actions invokes callbacks (from
-    // Future::then or Future::onAny, etc) that themselves invoke
-    // FutureDispatch or FutureMessage.
-    pthread_mutexattr_t attr;
-    pthread_mutexattr_init(&attr);
-    pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
-    pthread_mutex_init(&mutex, &attr);
-    pthread_mutexattr_destroy(&attr);
-  }
+  TestsFilter() = default;
 
   virtual bool filter(const MessageEvent& event) { return handle(event); }
   virtual bool filter(const DispatchEvent& event) { return handle(event); }
@@ -244,14 +231,19 @@ public:
   template <typename T>
   bool handle(const T& t)
   {
-    pthread_mutex_lock(&mutex);
-    bool drop = mock.filter(t);
-    pthread_mutex_unlock(&mutex);
-    return drop;
+    synchronized (mutex) {
+      return mock.filter(t);
+    }
   }
 
   MockFilter mock;
-  pthread_mutex_t mutex;
+
+  // We use a recursive mutex here in the event that satisfying the
+  // future created in FutureMessage or FutureDispatch via the
+  // FutureArgField or FutureSatisfy actions invokes callbacks (from
+  // Future::then or Future::onAny, etc) that themselves invoke
+  // FutureDispatch or FutureMessage.
+  std::recursive_mutex mutex;
 };
 
 
@@ -337,14 +329,17 @@ template <typename Name, typename From, typename To>
 Future<Message> FutureMessage(Name name, From from, To to, bool drop = false)
 {
   TestsFilter* filter = FilterTestEventListener::instance()->install();
-  pthread_mutex_lock(&filter->mutex);
   Future<Message> future;
-  EXPECT_CALL(filter->mock, filter(testing::A<const MessageEvent&>()))
-    .With(MessageMatcher(name, from, to))
-    .WillOnce(testing::DoAll(FutureArgField<0>(&MessageEvent::message, &future),
-                             testing::Return(drop)))
-    .RetiresOnSaturation(); // Don't impose any subsequent expectations.
-  pthread_mutex_unlock(&filter->mutex);
+  synchronized (filter->mutex) {
+    EXPECT_CALL(filter->mock, filter(testing::A<const MessageEvent&>()))
+      .With(MessageMatcher(name, from, to))
+      .WillOnce(testing::DoAll(FutureArgField<0>(
+                                   &MessageEvent::message,
+                                   &future),
+                               testing::Return(drop)))
+      .RetiresOnSaturation(); // Don't impose any subsequent expectations.
+  }
+
   return future;
 }
 
@@ -353,14 +348,15 @@ template <typename PID, typename Method>
 Future<Nothing> FutureDispatch(PID pid, Method method, bool drop = false)
 {
   TestsFilter* filter = FilterTestEventListener::instance()->install();
-  pthread_mutex_lock(&filter->mutex);
   Future<Nothing> future;
-  EXPECT_CALL(filter->mock, filter(testing::A<const DispatchEvent&>()))
-    .With(DispatchMatcher(pid, method))
-    .WillOnce(testing::DoAll(FutureSatisfy(&future),
-                             testing::Return(drop)))
-    .RetiresOnSaturation(); // Don't impose any subsequent expectations.
-  pthread_mutex_unlock(&filter->mutex);
+  synchronized (filter->mutex) {
+    EXPECT_CALL(filter->mock, filter(testing::A<const DispatchEvent&>()))
+      .With(DispatchMatcher(pid, method))
+      .WillOnce(testing::DoAll(FutureSatisfy(&future),
+                              testing::Return(drop)))
+      .RetiresOnSaturation(); // Don't impose any subsequent expectations.
+  }
+
   return future;
 }
 
@@ -369,11 +365,11 @@ template <typename Name, typename From, typename To>
 void DropMessages(Name name, From from, To to)
 {
   TestsFilter* filter = FilterTestEventListener::instance()->install();
-  pthread_mutex_lock(&filter->mutex);
-  EXPECT_CALL(filter->mock, filter(testing::A<const MessageEvent&>()))
-    .With(MessageMatcher(name, from, to))
-    .WillRepeatedly(testing::Return(true));
-  pthread_mutex_unlock(&filter->mutex);
+  synchronized (filter->mutex) {
+    EXPECT_CALL(filter->mock, filter(testing::A<const MessageEvent&>()))
+      .With(MessageMatcher(name, from, to))
+      .WillRepeatedly(testing::Return(true));
+  }
 }
 
 
@@ -381,11 +377,11 @@ template <typename Name, typename From, typename To>
 void ExpectNoFutureMessages(Name name, From from, To to)
 {
   TestsFilter* filter = FilterTestEventListener::instance()->install();
-  pthread_mutex_lock(&filter->mutex);
-  EXPECT_CALL(filter->mock, filter(testing::A<const MessageEvent&>()))
-    .With(MessageMatcher(name, from, to))
-    .Times(0);
-  pthread_mutex_unlock(&filter->mutex);
+  synchronized (filter->mutex) {
+    EXPECT_CALL(filter->mock, filter(testing::A<const MessageEvent&>()))
+      .With(MessageMatcher(name, from, to))
+      .Times(0);
+  }
 }
 
 
@@ -393,11 +389,11 @@ template <typename PID, typename Method>
 void DropDispatches(PID pid, Method method)
 {
   TestsFilter* filter = FilterTestEventListener::instance()->install();
-  pthread_mutex_lock(&filter->mutex);
-  EXPECT_CALL(filter->mock, filter(testing::A<const DispatchEvent&>()))
-    .With(DispatchMatcher(pid, method))
-    .WillRepeatedly(testing::Return(true));
-  pthread_mutex_unlock(&filter->mutex);
+  synchronized (filter->mutex) {
+    EXPECT_CALL(filter->mock, filter(testing::A<const DispatchEvent&>()))
+      .With(DispatchMatcher(pid, method))
+      .WillRepeatedly(testing::Return(true));
+  }
 }
 
 } // namespace process {


[05/21] mesos git commit: Update libprocess Process to use synchronized.

Posted by be...@apache.org.
Update libprocess Process to use synchronized.

Review: https://reviews.apache.org/r/35090


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9cb1283b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9cb1283b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9cb1283b

Branch: refs/heads/master
Commit: 9cb1283bcd942574bea07d0cf9b6748ae3869cc6
Parents: 9e7f64a
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Jun 13 05:58:13 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Jun 14 02:43:00 2015 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/process.hpp | 20 ++++-----------
 3rdparty/libprocess/src/process.cpp             | 26 ++++----------------
 2 files changed, 10 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/9cb1283b/3rdparty/libprocess/include/process/process.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/process.hpp b/3rdparty/libprocess/include/process/process.hpp
index e70dd38..6a0b21d 100644
--- a/3rdparty/libprocess/include/process/process.hpp
+++ b/3rdparty/libprocess/include/process/process.hpp
@@ -2,7 +2,6 @@
 #define __PROCESS_PROCESS_HPP__
 
 #include <stdint.h>
-#include <pthread.h>
 
 #include <map>
 #include <queue>
@@ -20,6 +19,7 @@
 #include <stout/duration.hpp>
 #include <stout/lambda.hpp>
 #include <stout/option.hpp>
+#include <stout/synchronized.hpp>
 #include <stout/thread.hpp>
 
 namespace process {
@@ -180,24 +180,14 @@ protected:
     assets[name] = asset;
   }
 
-  void lock()
-  {
-    pthread_mutex_lock(&m);
-  }
-
-  void unlock()
-  {
-    pthread_mutex_unlock(&m);
-  }
-
   template<typename T>
   size_t eventCount()
   {
     size_t count = 0U;
 
-    lock();
-    count = std::count_if(events.begin(), events.end(), isEventType<T>);
-    unlock();
+    synchronized (mutex) {
+      count = std::count_if(events.begin(), events.end(), isEventType<T>);
+    }
 
     return count;
   }
@@ -226,7 +216,7 @@ private:
 
   // Mutex protecting internals.
   // TODO(benh): Consider replacing with a spinlock, on multi-core systems.
-  pthread_mutex_t m;
+  std::recursive_mutex mutex;
 
   // Enqueue the specified message, request, or function call.
   void enqueue(Event* event, bool inject = false);

http://git-wip-us.apache.org/repos/asf/mesos/blob/9cb1283b/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index aadd7bb..c2baa6c 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -2147,8 +2147,7 @@ void ProcessManager::resume(ProcessBase* process)
   while (!terminate && !blocked) {
     Event* event = NULL;
 
-    process->lock();
-    {
+    synchronized (process->mutex) {
       if (process->events.size() > 0) {
         event = process->events.front();
         process->events.pop_front();
@@ -2158,7 +2157,6 @@ void ProcessManager::resume(ProcessBase* process)
         blocked = true;
       }
     }
-    process->unlock();
 
     if (!blocked) {
       CHECK(event != NULL);
@@ -2251,13 +2249,11 @@ void ProcessManager::cleanup(ProcessBase* process)
   // another process that gets spawned with the same PID.
   deque<Event*> events;
 
-  process->lock();
-  {
+  synchronized (process->mutex) {
     process->state = ProcessBase::TERMINATING;
     events = process->events;
     process->events.clear();
   }
-  process->unlock();
 
   // Delete pending events.
   while (!events.empty()) {
@@ -2279,8 +2275,7 @@ void ProcessManager::cleanup(ProcessBase* process)
       __sync_synchronize();
     }
 
-    process->lock();
-    {
+    synchronized (process->mutex) {
       CHECK(process->events.empty());
 
       processes.erase(process->pid.id);
@@ -2296,7 +2291,6 @@ void ProcessManager::cleanup(ProcessBase* process)
       CHECK(process->refs == 0);
       process->state = ProcessBase::TERMINATED;
     }
-    process->unlock();
 
     // Note that we don't remove the process from the clock during
     // cleanup, but rather the clock is reset for a process when it is
@@ -2619,13 +2613,11 @@ Future<Response> ProcessManager::__processes__(const Request&)
         JSON::Array* events;
       } visitor(&events);
 
-      process->lock();
-      {
+      synchronized (process->mutex) {
         foreach (Event* event, process->events) {
           event->visit(&visitor);
         }
       }
-      process->unlock();
 
       object.values["events"] = events;
       array.values.push_back(object);
@@ -2642,12 +2634,6 @@ ProcessBase::ProcessBase(const string& id)
 
   state = ProcessBase::BOTTOM;
 
-  pthread_mutexattr_t attr;
-  pthread_mutexattr_init(&attr);
-  pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
-  pthread_mutex_init(&m, &attr);
-  pthread_mutexattr_destroy(&attr);
-
   refs = 0;
 
   pid.id = id != "" ? id : ID::generate();
@@ -2669,8 +2655,7 @@ void ProcessBase::enqueue(Event* event, bool inject)
 {
   CHECK(event != NULL);
 
-  lock();
-  {
+  synchronized (mutex) {
     if (state != TERMINATING && state != TERMINATED) {
       if (!inject) {
         events.push_back(event);
@@ -2690,7 +2675,6 @@ void ProcessBase::enqueue(Event* event, bool inject)
       delete event;
     }
   }
-  unlock();
 }
 
 


[18/21] mesos git commit: mesos: Use CHECK_SOME, CHECK_NONE, CHECK_ERROR.

Posted by be...@apache.org.
mesos: Use CHECK_SOME, CHECK_NONE, CHECK_ERROR.

Used `grep -r "CHECK([^\!].*\.isNone())" .` to find the instances that
look like `CHECK(x.isNone());`

Review: https://reviews.apache.org/r/35426


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/26527296
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/26527296
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/26527296

Branch: refs/heads/master
Commit: 26527296f1e4ffc53a4545a63e1e6e0cee06e9f9
Parents: 9823648
Author: Michael Park <mc...@gmail.com>
Authored: Sat Jun 13 07:29:31 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Jun 14 02:43:01 2015 -0700

----------------------------------------------------------------------
 src/launcher/fetcher.cpp                               |  2 +-
 src/linux/cgroups.cpp                                  |  2 +-
 src/master/registrar.cpp                               |  2 +-
 src/slave/containerizer/isolators/cgroups/cpushare.cpp |  2 +-
 src/slave/containerizer/isolators/cgroups/mem.cpp      |  2 +-
 src/slave/status_update_manager.cpp                    |  8 ++++----
 src/state/leveldb.cpp                                  |  4 ++--
 src/state/zookeeper.cpp                                |  6 +++---
 src/zookeeper/contender.cpp                            |  2 +-
 src/zookeeper/group.cpp                                | 10 +++++-----
 10 files changed, 20 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/26527296/src/launcher/fetcher.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/fetcher.cpp b/src/launcher/fetcher.cpp
index a3d27dc..8aee490 100644
--- a/src/launcher/fetcher.cpp
+++ b/src/launcher/fetcher.cpp
@@ -385,7 +385,7 @@ int main(int argc, char* argv[])
   logging::initialize(argv[0], flags, true); // Catch signals.
 
   const Option<std::string> jsonFetcherInfo = os::getenv("MESOS_FETCHER_INFO");
-  CHECK(jsonFetcherInfo.isSome())
+  CHECK_SOME(jsonFetcherInfo)
     << "Missing MESOS_FETCHER_INFO environment variable";
 
   LOG(INFO) << "Fetcher Info: " << jsonFetcherInfo.get();

http://git-wip-us.apache.org/repos/asf/mesos/blob/26527296/src/linux/cgroups.cpp
----------------------------------------------------------------------
diff --git a/src/linux/cgroups.cpp b/src/linux/cgroups.cpp
index a612fab..6a87ac4 100644
--- a/src/linux/cgroups.cpp
+++ b/src/linux/cgroups.cpp
@@ -2320,7 +2320,7 @@ private:
 
   void _listen(const process::Future<uint64_t>& future)
   {
-    CHECK(error.isNone());
+    CHECK_NONE(error);
 
     if (future.isReady()) {
       value_ += future.get();

http://git-wip-us.apache.org/repos/asf/mesos/blob/26527296/src/master/registrar.cpp
----------------------------------------------------------------------
diff --git a/src/master/registrar.cpp b/src/master/registrar.cpp
index 3fde8fa..b871229 100644
--- a/src/master/registrar.cpp
+++ b/src/master/registrar.cpp
@@ -419,7 +419,7 @@ void RegistrarProcess::update()
   }
 
   CHECK(!updating);
-  CHECK(error.isNone());
+  CHECK_NONE(error);
   CHECK_SOME(variable);
 
   // Time how long it takes to apply the operations.

http://git-wip-us.apache.org/repos/asf/mesos/blob/26527296/src/slave/containerizer/isolators/cgroups/cpushare.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/isolators/cgroups/cpushare.cpp b/src/slave/containerizer/isolators/cgroups/cpushare.cpp
index 5bd3525..21e4284 100644
--- a/src/slave/containerizer/isolators/cgroups/cpushare.cpp
+++ b/src/slave/containerizer/isolators/cgroups/cpushare.cpp
@@ -315,7 +315,7 @@ Future<Nothing> CgroupsCpushareIsolatorProcess::isolate(
 
   Info* info = CHECK_NOTNULL(infos[containerId]);
 
-  CHECK(info->pid.isNone());
+  CHECK_NONE(info->pid);
   info->pid = pid;
 
   foreach (const string& subsystem, subsystems) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/26527296/src/slave/containerizer/isolators/cgroups/mem.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/isolators/cgroups/mem.cpp b/src/slave/containerizer/isolators/cgroups/mem.cpp
index 7fb6c8a..9d65bf5 100644
--- a/src/slave/containerizer/isolators/cgroups/mem.cpp
+++ b/src/slave/containerizer/isolators/cgroups/mem.cpp
@@ -299,7 +299,7 @@ Future<Nothing> CgroupsMemIsolatorProcess::isolate(
 
   Info* info = CHECK_NOTNULL(infos[containerId]);
 
-  CHECK(info->pid.isNone());
+  CHECK_NONE(info->pid);
   info->pid = pid;
 
   Try<Nothing> assign = cgroups::assign(hierarchy, info->cgroup, pid);

http://git-wip-us.apache.org/repos/asf/mesos/blob/26527296/src/slave/status_update_manager.cpp
----------------------------------------------------------------------
diff --git a/src/slave/status_update_manager.cpp b/src/slave/status_update_manager.cpp
index 1d7c4d0..35b943b 100644
--- a/src/slave/status_update_manager.cpp
+++ b/src/slave/status_update_manager.cpp
@@ -352,7 +352,7 @@ Future<Nothing> StatusUpdateManagerProcess::_update(
   // Forward the status update to the master if this is the first in the stream.
   // Subsequent status updates will get sent in 'acknowledgement()'.
   if (!paused && stream->pending.size() == 1) {
-    CHECK(stream->timeout.isNone());
+    CHECK_NONE(stream->timeout);
     const Result<StatusUpdate>& next = stream->next();
     if (next.isError()) {
       return Failure(next.error());
@@ -470,7 +470,7 @@ void StatusUpdateManagerProcess::timeout(const Duration& duration)
     foreachvalue (StatusUpdateStream* stream, streams[frameworkId]) {
       CHECK_NOTNULL(stream);
       if (!stream->pending.empty()) {
-        CHECK(stream->timeout.isSome());
+        CHECK_SOME(stream->timeout);
         if (stream->timeout.get().expired()) {
           const StatusUpdate& update = stream->pending.front();
           LOG(WARNING) << "Resending status update " << update;
@@ -814,7 +814,7 @@ Try<Nothing> StatusUpdateStream::handle(
     const StatusUpdate& update,
     const StatusUpdateRecord::Type& type)
 {
-  CHECK(error.isNone());
+  CHECK_NONE(error);
 
   // Checkpoint the update if necessary.
   if (checkpoint) {
@@ -850,7 +850,7 @@ void StatusUpdateStream::_handle(
     const StatusUpdate& update,
     const StatusUpdateRecord::Type& type)
 {
-  CHECK(error.isNone());
+  CHECK_NONE(error);
 
   if (type == StatusUpdateRecord::UPDATE) {
     // Record this update.

http://git-wip-us.apache.org/repos/asf/mesos/blob/26527296/src/state/leveldb.cpp
----------------------------------------------------------------------
diff --git a/src/state/leveldb.cpp b/src/state/leveldb.cpp
index 4303df3..14a1807 100644
--- a/src/state/leveldb.cpp
+++ b/src/state/leveldb.cpp
@@ -206,7 +206,7 @@ Future<bool> LevelDBStorageProcess::expunge(const Entry& entry)
 
 Try<Option<Entry> > LevelDBStorageProcess::read(const string& name)
 {
-  CHECK(error.isNone());
+  CHECK_NONE(error);
 
   leveldb::ReadOptions options;
 
@@ -234,7 +234,7 @@ Try<Option<Entry> > LevelDBStorageProcess::read(const string& name)
 
 Try<bool> LevelDBStorageProcess::write(const Entry& entry)
 {
-  CHECK(error.isNone());
+  CHECK_NONE(error);
 
   leveldb::WriteOptions options;
   options.sync = true;

http://git-wip-us.apache.org/repos/asf/mesos/blob/26527296/src/state/zookeeper.cpp
----------------------------------------------------------------------
diff --git a/src/state/zookeeper.cpp b/src/state/zookeeper.cpp
index d355bd7..9151d55 100644
--- a/src/state/zookeeper.cpp
+++ b/src/state/zookeeper.cpp
@@ -426,7 +426,7 @@ Result<std::set<string> > ZooKeeperStorageProcess::doNames()
 
 Result<Option<Entry> > ZooKeeperStorageProcess::doGet(const string& name)
 {
-  CHECK(error.isNone()) << ": " << error.get();
+  CHECK_NONE(error) << ": " << error.get();
   CHECK(state == CONNECTED);
 
   string result;
@@ -460,7 +460,7 @@ Result<Option<Entry> > ZooKeeperStorageProcess::doGet(const string& name)
 Result<bool> ZooKeeperStorageProcess::doSet(const Entry& entry,
                                             const UUID& uuid)
 {
-  CHECK(error.isNone()) << ": " << error.get();
+  CHECK_NONE(error) << ": " << error.get();
   CHECK(state == CONNECTED);
 
   // Serialize to make sure we're under the 1 MB limit.
@@ -558,7 +558,7 @@ Result<bool> ZooKeeperStorageProcess::doSet(const Entry& entry,
 
 Result<bool> ZooKeeperStorageProcess::doExpunge(const Entry& entry)
 {
-  CHECK(error.isNone()) << ": " << error.get();
+  CHECK_NONE(error) << ": " << error.get();
   CHECK(state == CONNECTED);
 
   string result;

http://git-wip-us.apache.org/repos/asf/mesos/blob/26527296/src/zookeeper/contender.cpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/contender.cpp b/src/zookeeper/contender.cpp
index 55cb7a1..3255ef0 100644
--- a/src/zookeeper/contender.cpp
+++ b/src/zookeeper/contender.cpp
@@ -226,7 +226,7 @@ void LeaderContenderProcess::joined()
   CHECK(!candidacy.isDiscarded());
 
   // Cannot be watching because the candidacy is not obtained yet.
-  CHECK(watching.isNone());
+  CHECK_NONE(watching);
 
   CHECK_SOME(contending);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/26527296/src/zookeeper/group.cpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/group.cpp b/src/zookeeper/group.cpp
index 173caa8..33c56da 100644
--- a/src/zookeeper/group.cpp
+++ b/src/zookeeper/group.cpp
@@ -267,7 +267,7 @@ Future<set<Group::Membership> > GroupProcess::watch(
       // Non-retryable error.
       return Failure(cached.error());
     } else if (!cached.get()) {
-      CHECK(memberships.isNone());
+      CHECK_NONE(memberships);
 
       // Try again later.
       if (!retrying) {
@@ -430,7 +430,7 @@ void GroupProcess::reconnecting(int64_t sessionId)
   // we create a local timer and "expire" our session prematurely if
   // we haven't reconnected within the session expiration time out.
   // The timer can be reset if the connection is restored.
-  CHECK(timer.isNone());
+  CHECK_NONE(timer);
 
   // Use the negotiated session timeout for the reconnect timer.
   timer = delay(zk->getSessionTimeout(),
@@ -533,7 +533,7 @@ void GroupProcess::updated(int64_t sessionId, const string& path)
   if (cached.isError()) {
     abort(cached.error()); // Cancel everything pending.
   } else if (!cached.get()) {
-    CHECK(memberships.isNone());
+    CHECK_NONE(memberships);
 
     // Try again later.
     if (!retrying) {
@@ -868,7 +868,7 @@ Try<bool> GroupProcess::sync()
   if (memberships.isNone()) {
     Try<bool> cached = cache();
     if (cached.isError() || !cached.get()) {
-      CHECK(memberships.isNone());
+      CHECK_NONE(memberships);
       return cached;
     } else {
       update(); // Update any pending watches.
@@ -889,7 +889,7 @@ void GroupProcess::retry(const Duration& duration)
   // We cancel the retries when the group aborts and when its ZK
   // session expires so 'retrying' should be false in the condition
   // check above.
-  CHECK(error.isNone());
+  CHECK_NONE(error);
 
   // In order to be retrying, we should be at least CONNECTED.
   CHECK(state == CONNECTED || state == AUTHENTICATED || state == READY)


[19/21] mesos git commit: Update Mesos executor to use synchronized.

Posted by be...@apache.org.
Update Mesos executor to use synchronized.

Review: https://reviews.apache.org/r/35097


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8939609d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8939609d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8939609d

Branch: refs/heads/master
Commit: 8939609d403aa1043d637cc03647c4ee40478b20
Parents: b2d8047
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Jun 13 07:12:31 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Jun 14 02:43:01 2015 -0700

----------------------------------------------------------------------
 src/exec/exec.cpp | 287 +++++++++++++++++++++++++------------------------
 1 file changed, 145 insertions(+), 142 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/8939609d/src/exec/exec.cpp
----------------------------------------------------------------------
diff --git a/src/exec/exec.cpp b/src/exec/exec.cpp
index 0dfd5a6..930dda9 100644
--- a/src/exec/exec.cpp
+++ b/src/exec/exec.cpp
@@ -43,9 +43,9 @@
 #include <stout/os.hpp>
 #include <stout/stopwatch.hpp>
 #include <stout/stringify.hpp>
+#include <stout/synchronized.hpp>
 #include <stout/uuid.hpp>
 
-#include "common/lock.hpp"
 #include "common/protobuf_utils.hpp"
 
 #include "logging/flags.hpp"
@@ -404,8 +404,9 @@ protected:
   {
     terminate(self());
 
-    Lock lock(mutex);
-    pthread_cond_signal(cond);
+    synchronized (mutex) {
+      pthread_cond_signal(cond);
+    }
   }
 
   void abort()
@@ -413,8 +414,9 @@ protected:
     LOG(INFO) << "Deactivating the executor libprocess";
     CHECK(aborted);
 
-    Lock lock(mutex);
-    pthread_cond_signal(cond);
+    synchronized (mutex) {
+      pthread_cond_signal(cond);
+    }
   }
 
   void _recoveryTimeout(UUID _connection)
@@ -611,173 +613,174 @@ MesosExecutorDriver::~MesosExecutorDriver()
 
 Status MesosExecutorDriver::start()
 {
-  Lock lock(&mutex);
+  synchronized (mutex) {
+    if (status != DRIVER_NOT_STARTED) {
+      return status;
+    }
 
-  if (status != DRIVER_NOT_STARTED) {
-    return status;
-  }
+    // Set stream buffering mode to flush on newlines so that we
+    // capture logs from user processes even when output is redirected
+    // to a file.
+    setvbuf(stdout, 0, _IOLBF, 0);
+    setvbuf(stderr, 0, _IOLBF, 0);
 
-  // Set stream buffering mode to flush on newlines so that we capture logs
-  // from user processes even when output is redirected to a file.
-  setvbuf(stdout, 0, _IOLBF, 0);
-  setvbuf(stderr, 0, _IOLBF, 0);
+    bool local;
 
-  bool local;
+    UPID slave;
+    SlaveID slaveId;
+    FrameworkID frameworkId;
+    ExecutorID executorId;
+    string workDirectory;
+    bool checkpoint;
 
-  UPID slave;
-  SlaveID slaveId;
-  FrameworkID frameworkId;
-  ExecutorID executorId;
-  string workDirectory;
-  bool checkpoint;
-
-  Option<string> value;
-  std::istringstream iss;
+    Option<string> value;
+    std::istringstream iss;
 
-  // Check if this is local (for example, for testing).
-  local = os::getenv("MESOS_LOCAL").isSome();
+    // Check if this is local (for example, for testing).
+    local = os::getenv("MESOS_LOCAL").isSome();
 
-  // Get slave PID from environment.
-  value = os::getenv("MESOS_SLAVE_PID");
-  if (value.isNone()) {
-    EXIT(1) << "Expecting 'MESOS_SLAVE_PID' to be set in the environment.";
-  }
+    // Get slave PID from environment.
+    value = os::getenv("MESOS_SLAVE_PID");
+    if (value.isNone()) {
+      EXIT(1) << "Expecting 'MESOS_SLAVE_PID' to be set in the environment.";
+    }
 
-  slave = UPID(value.get());
-  CHECK(slave) << "Cannot parse MESOS_SLAVE_PID '" << value.get() << "'";
+    slave = UPID(value.get());
+    CHECK(slave) << "Cannot parse MESOS_SLAVE_PID '" << value.get() << "'";
 
-  // Get slave ID from environment.
-  value = os::getenv("MESOS_SLAVE_ID");
-  if (value.isNone()) {
-    EXIT(1) << "Expecting 'MESOS_SLAVE_ID' to be set in the environment.";
-  }
-  slaveId.set_value(value.get());
+    // Get slave ID from environment.
+    value = os::getenv("MESOS_SLAVE_ID");
+    if (value.isNone()) {
+      EXIT(1) << "Expecting 'MESOS_SLAVE_ID' to be set in the environment.";
+    }
+    slaveId.set_value(value.get());
 
-  // Get framework ID from environment.
-  value = os::getenv("MESOS_FRAMEWORK_ID");
-  if (value.isNone()) {
-    EXIT(1) << "Expecting 'MESOS_FRAMEWORK_ID' to be set in the environment.";
-  }
-  frameworkId.set_value(value.get());
+    // Get framework ID from environment.
+    value = os::getenv("MESOS_FRAMEWORK_ID");
+    if (value.isNone()) {
+      EXIT(1) << "Expecting 'MESOS_FRAMEWORK_ID' to be set in the environment.";
+    }
+    frameworkId.set_value(value.get());
 
-  // Get executor ID from environment.
-  value = os::getenv("MESOS_EXECUTOR_ID");
-  if (value.isNone()) {
-    EXIT(1) << "Expecting 'MESOS_EXECUTOR_ID' to be set in the environment.";
-  }
-  executorId.set_value(value.get());
+    // Get executor ID from environment.
+    value = os::getenv("MESOS_EXECUTOR_ID");
+    if (value.isNone()) {
+      EXIT(1) << "Expecting 'MESOS_EXECUTOR_ID' to be set in the environment.";
+    }
+    executorId.set_value(value.get());
 
-  // Get working directory from environment.
-  value = os::getenv("MESOS_DIRECTORY");
-  if (value.isNone()) {
-    EXIT(1) << "Expecting 'MESOS_DIRECTORY' to be set in the environment.";
-  }
-  workDirectory = value.get();
+    // Get working directory from environment.
+    value = os::getenv("MESOS_DIRECTORY");
+    if (value.isNone()) {
+      EXIT(1) << "Expecting 'MESOS_DIRECTORY' to be set in the environment.";
+    }
+    workDirectory = value.get();
 
-  // Get checkpointing status from environment.
-  value = os::getenv("MESOS_CHECKPOINT");
-  checkpoint = value.isSome() && value.get() == "1";
+    // Get checkpointing status from environment.
+    value = os::getenv("MESOS_CHECKPOINT");
+    checkpoint = value.isSome() && value.get() == "1";
 
-  Duration recoveryTimeout = slave::RECOVERY_TIMEOUT;
+    Duration recoveryTimeout = slave::RECOVERY_TIMEOUT;
 
-  // Get the recovery timeout if checkpointing is enabled.
-  if (checkpoint) {
-    value = os::getenv("MESOS_RECOVERY_TIMEOUT");
+    // Get the recovery timeout if checkpointing is enabled.
+    if (checkpoint) {
+      value = os::getenv("MESOS_RECOVERY_TIMEOUT");
 
-    if (value.isSome()) {
-      Try<Duration> _recoveryTimeout = Duration::parse(value.get());
+      if (value.isSome()) {
+        Try<Duration> _recoveryTimeout = Duration::parse(value.get());
 
-      CHECK_SOME(_recoveryTimeout)
-          << "Cannot parse MESOS_RECOVERY_TIMEOUT '" << value.get() << "': "
-          << _recoveryTimeout.error();
+        CHECK_SOME(_recoveryTimeout)
+            << "Cannot parse MESOS_RECOVERY_TIMEOUT '" << value.get() << "': "
+            << _recoveryTimeout.error();
 
-      recoveryTimeout = _recoveryTimeout.get();
+        recoveryTimeout = _recoveryTimeout.get();
+      }
     }
-  }
 
-  CHECK(process == NULL);
-
-  process = new ExecutorProcess(
-      slave,
-      this,
-      executor,
-      slaveId,
-      frameworkId,
-      executorId,
-      local,
-      workDirectory,
-      checkpoint,
-      recoveryTimeout,
-      &mutex,
-      &cond);
-
-  spawn(process);
-
-  return status = DRIVER_RUNNING;
+    CHECK(process == NULL);
+
+    process = new ExecutorProcess(
+        slave,
+        this,
+        executor,
+        slaveId,
+        frameworkId,
+        executorId,
+        local,
+        workDirectory,
+        checkpoint,
+        recoveryTimeout,
+        &mutex,
+        &cond);
+
+    spawn(process);
+
+    return status = DRIVER_RUNNING;
+  }
 }
 
 
 Status MesosExecutorDriver::stop()
 {
-  Lock lock(&mutex);
-
-  if (status != DRIVER_RUNNING && status != DRIVER_ABORTED) {
-    return status;
-  }
+  synchronized (mutex) {
+    if (status != DRIVER_RUNNING && status != DRIVER_ABORTED) {
+      return status;
+    }
 
-  CHECK(process != NULL);
+    CHECK(process != NULL);
 
-  dispatch(process, &ExecutorProcess::stop);
+    dispatch(process, &ExecutorProcess::stop);
 
-  bool aborted = status == DRIVER_ABORTED;
+    bool aborted = status == DRIVER_ABORTED;
 
-  status = DRIVER_STOPPED;
+    status = DRIVER_STOPPED;
 
-  return aborted ? DRIVER_ABORTED : status;
+    return aborted ? DRIVER_ABORTED : status;
+  }
 }
 
 
 Status MesosExecutorDriver::abort()
 {
-  Lock lock(&mutex);
-
-  if (status != DRIVER_RUNNING) {
-    return status;
-  }
+  synchronized (mutex) {
+    if (status != DRIVER_RUNNING) {
+      return status;
+    }
 
-  CHECK(process != NULL);
+    CHECK(process != NULL);
 
-  // We set the volatile aborted to true here to prevent any further
-  // messages from being processed in the ExecutorProcess. However,
-  // if abort() is called from another thread as the ExecutorProcess,
-  // there may be at most one additional message processed.
-  // TODO(bmahler): Use an atomic boolean.
-  process->aborted = true;
+    // We set the volatile aborted to true here to prevent any further
+    // messages from being processed in the ExecutorProcess. However,
+    // if abort() is called from another thread as the ExecutorProcess,
+    // there may be at most one additional message processed.
+    // TODO(bmahler): Use an atomic boolean.
+    process->aborted = true;
 
-  // Dispatching here ensures that we still process the outstanding
-  // requests *from* the executor, since those do proceed when
-  // aborted is true.
-  dispatch(process, &ExecutorProcess::abort);
+    // Dispatching here ensures that we still process the outstanding
+    // requests *from* the executor, since those do proceed when
+    // aborted is true.
+    dispatch(process, &ExecutorProcess::abort);
 
-  return status = DRIVER_ABORTED;
+    return status = DRIVER_ABORTED;
+  }
 }
 
 
 Status MesosExecutorDriver::join()
 {
-  Lock lock(&mutex);
-
-  if (status != DRIVER_RUNNING) {
-    return status;
-  }
+  synchronized (mutex) {
+    if (status != DRIVER_RUNNING) {
+      return status;
+    }
 
-  while (status == DRIVER_RUNNING) {
-    pthread_cond_wait(&cond, &mutex);
-  }
+    while (status == DRIVER_RUNNING) {
+      pthread_cond_wait(&cond, &mutex);
+    }
 
-  CHECK(status == DRIVER_ABORTED || status == DRIVER_STOPPED);
+    CHECK(status == DRIVER_ABORTED || status == DRIVER_STOPPED);
 
-  return status;
+    return status;
+  }
 }
 
 
@@ -790,31 +793,31 @@ Status MesosExecutorDriver::run()
 
 Status MesosExecutorDriver::sendStatusUpdate(const TaskStatus& taskStatus)
 {
-  Lock lock(&mutex);
-
-  if (status != DRIVER_RUNNING) {
-    return status;
-  }
+  synchronized (mutex) {
+    if (status != DRIVER_RUNNING) {
+      return status;
+    }
 
-  CHECK(process != NULL);
+    CHECK(process != NULL);
 
-  dispatch(process, &ExecutorProcess::sendStatusUpdate, taskStatus);
+    dispatch(process, &ExecutorProcess::sendStatusUpdate, taskStatus);
 
-  return status;
+    return status;
+  }
 }
 
 
 Status MesosExecutorDriver::sendFrameworkMessage(const string& data)
 {
-  Lock lock(&mutex);
-
-  if (status != DRIVER_RUNNING) {
-    return status;
-  }
+  synchronized (mutex) {
+    if (status != DRIVER_RUNNING) {
+      return status;
+    }
 
-  CHECK(process != NULL);
+    CHECK(process != NULL);
 
-  dispatch(process, &ExecutorProcess::sendFrameworkMessage, data);
+    dispatch(process, &ExecutorProcess::sendFrameworkMessage, data);
 
-  return status;
+    return status;
+  }
 }


[10/21] mesos git commit: Add depedendent include to synchronized.hpp.

Posted by be...@apache.org.
Add depedendent include to synchronized.hpp.

Review: https://reviews.apache.org/r/35088


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/eb8f88db
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/eb8f88db
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/eb8f88db

Branch: refs/heads/master
Commit: eb8f88db5c4e5f87159016267df6b9f771297eb1
Parents: 19b1960
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Jun 13 05:54:22 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Jun 14 02:43:00 2015 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/3rdparty/stout/include/stout/synchronized.hpp | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/eb8f88db/3rdparty/libprocess/3rdparty/stout/include/stout/synchronized.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/stout/include/stout/synchronized.hpp b/3rdparty/libprocess/3rdparty/stout/include/stout/synchronized.hpp
index 9b11cbc..e40ec55 100644
--- a/3rdparty/libprocess/3rdparty/stout/include/stout/synchronized.hpp
+++ b/3rdparty/libprocess/3rdparty/stout/include/stout/synchronized.hpp
@@ -19,6 +19,8 @@
 #include <mutex>
 #include <type_traits>
 
+#include <glog/logging.h>
+
 #include <stout/preprocessor.hpp>
 
 // An RAII class for the 'synchronized(m)' macro.


[08/21] mesos git commit: Update libprocess Gate to use synchronized.

Posted by be...@apache.org.
Update libprocess Gate to use synchronized.

Review: https://reviews.apache.org/r/35094


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/eb33a57f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/eb33a57f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/eb33a57f

Branch: refs/heads/master
Commit: eb33a57ff32297d885d444336da86762dc98d793
Parents: 149f42f
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Jun 13 06:51:59 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Jun 14 02:43:00 2015 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/src/gate.hpp | 38 ++++++++++++++---------------------
 1 file changed, 15 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/eb33a57f/3rdparty/libprocess/src/gate.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/gate.hpp b/3rdparty/libprocess/src/gate.hpp
index 69c906b..e5c9379 100644
--- a/3rdparty/libprocess/src/gate.hpp
+++ b/3rdparty/libprocess/src/gate.hpp
@@ -3,6 +3,8 @@
 
 // TODO(benh): Build implementation directly on-top-of futex's for Linux.
 
+#include <stout/synchronized.hpp>
+
 class Gate
 {
 public:
@@ -31,21 +33,21 @@ public:
   // all (if 'all' is true) of the threads waiting on it.
   void open(bool all = true)
   {
-    pthread_mutex_lock(&mutex);
-    {
+    synchronized (mutex) {
       state++;
-      if (all) pthread_cond_broadcast(&cond);
-      else pthread_cond_signal(&cond);
+      if (all) {
+        pthread_cond_broadcast(&cond);
+      } else {
+        pthread_cond_signal(&cond);
+      }
     }
-    pthread_mutex_unlock(&mutex);
   }
 
   // Blocks the current thread until the gate's state changes from
   // the current state.
   void wait()
   {
-    pthread_mutex_lock(&mutex);
-    {
+    synchronized (mutex) {
       waiters++;
       state_t old = state;
       while (old == state) {
@@ -53,7 +55,6 @@ public:
       }
       waiters--;
     }
-    pthread_mutex_unlock(&mutex);
   }
 
   // Gets the current state of the gate and notifies the gate about
@@ -61,14 +62,10 @@ public:
   // Call 'leave()' if no longer interested in the state change.
   state_t approach()
   {
-    state_t old;
-    pthread_mutex_lock(&mutex);
-    {
+    synchronized (mutex) {
       waiters++;
-      old = state;
+      return state;
     }
-    pthread_mutex_unlock(&mutex);
-    return old;
   }
 
   // Blocks the current thread until the gate's state changes from
@@ -76,25 +73,22 @@ public:
   // calling 'approach()'.
   void arrive(state_t old)
   {
-    pthread_mutex_lock(&mutex);
-    {
+    synchronized (mutex) {
       while (old == state) {
         pthread_cond_wait(&cond, &mutex);
       }
+
       waiters--;
     }
-    pthread_mutex_unlock(&mutex);
   }
 
   // Notifies the gate that a waiter (the current thread) is no
   // longer waiting for the gate's state change.
   void leave()
   {
-    pthread_mutex_lock(&mutex);
-    {
+    synchronized (mutex) {
       waiters--;
     }
-    pthread_mutex_unlock(&mutex);
   }
 
   // Returns true if there is no one waiting on the gate's state
@@ -102,11 +96,9 @@ public:
   bool empty()
   {
     bool occupied = true;
-    pthread_mutex_lock(&mutex);
-    {
+    synchronized (mutex) {
       occupied = waiters > 0 ? true : false;
     }
-    pthread_mutex_unlock(&mutex);
     return !occupied;
   }
 };


[11/21] mesos git commit: Improvements to Synchronized.

Posted by be...@apache.org.
Improvements to Synchronized.

(1) Simplify introducing new synchronization primitives.

Before:

```cpp
template <>
class Synchronized<bufferevent>
{
public:
  Synchronized(bufferevent* _bev) : bev(CHECK_NOTNULL(_bev))
  {
    bufferevent_lock(bev);
  }

  Synchronized(bufferevent** _bev) : Synchronized(*CHECK_NOTNULL(_bev)) {}

  ~Synchronized()
  {
    bufferevent_unlock(bev);
  }

  operator bool() const { return true; }

private:
  bufferevent* bev;
};
```

After:

```cpp
Synchronized<bufferevent> synchronize(bufferevent* bev) {
  return {
    bev,
    [](bufferevent* bev) { bufferevent_lock(bev); }
    [](bufferevent* bev) { bufferevent_unlock(bev); }
  };
}
```

(2) Enable `return` within `synchronized` and avoid the `control reaches end of non-void function` warning.

```cpp
int foo()
{
  int x = 42;
  std::mutex m;
  synchronized (m) {
    return x;
  }
}
```

Review: https://reviews.apache.org/r/35395


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/19b1960f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/19b1960f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/19b1960f

Branch: refs/heads/master
Commit: 19b1960ffd01a61e4e0d2b7b55f5aa60a9cf8738
Parents: c683b00
Author: Michael Park <mc...@gmail.com>
Authored: Sat Jun 13 05:51:50 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Jun 14 02:43:00 2015 -0700

----------------------------------------------------------------------
 .../stout/include/stout/synchronized.hpp        | 156 ++++++++++++++-----
 1 file changed, 120 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/19b1960f/3rdparty/libprocess/3rdparty/stout/include/stout/synchronized.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/stout/include/stout/synchronized.hpp b/3rdparty/libprocess/3rdparty/stout/include/stout/synchronized.hpp
index 60eaf26..9b11cbc 100644
--- a/3rdparty/libprocess/3rdparty/stout/include/stout/synchronized.hpp
+++ b/3rdparty/libprocess/3rdparty/stout/include/stout/synchronized.hpp
@@ -19,57 +19,141 @@
 #include <mutex>
 #include <type_traits>
 
-// A helper class for the synchronized(m) macro. It is an RAII 'guard'
-// for a synchronization primitive 'T'. The general template handles
-// cases such as 'std::mutex' and 'std::recursive_mutex'.
+#include <stout/preprocessor.hpp>
+
+// An RAII class for the 'synchronized(m)' macro.
 template <typename T>
 class Synchronized
 {
 public:
-  Synchronized(T* _lock) : lock(CHECK_NOTNULL(_lock)) { lock->lock(); }
-  Synchronized(T** _lock) : Synchronized(*CHECK_NOTNULL(_lock)) {}
+  explicit Synchronized(T* t, void (*acquire)(T*), void (*release)(T*))
+    : t_(CHECK_NOTNULL(t)), release_(release)
+  {
+    acquire(t_);
+  }
+
+  ~Synchronized() { release_(t_); }
 
-  ~Synchronized() { lock->unlock(); }
+  // NOTE: 'false' being returned here has no significance.
+  //       Refer to the NOTE for 'synchronized' at the bottom for why.
+  explicit operator bool() const { return false; }
 
-  operator bool() const { return true; }
 private:
-  T* lock;
+  T* t_;
+
+  void (*release_)(T*);
 };
 
 
-// A specialization of the Synchronized class for 'std::atomic_flag'.
-// This is necessary as the locking functions are different.
-template <>
-class Synchronized<std::atomic_flag>
+// The generic version handles mutexes which have 'lock' and 'unlock'
+// member functions such as 'std::mutex' and 'std::recursive_mutex'.
+template <typename T>
+Synchronized<T> synchronize(T* t)
 {
-public:
-  Synchronized(std::atomic_flag* _flag) : flag(CHECK_NOTNULL(_flag))
-  {
-    while (flag->test_and_set(std::memory_order_acquire)) {}
-  }
-  Synchronized(std::atomic_flag** _flag)
-    : Synchronized(*CHECK_NOTNULL(_flag)) {}
+  return Synchronized<T>(
+    t,
+    [](T* t) { t->lock(); },
+    [](T* t) { t->unlock(); }
+  );
+}
 
-  ~Synchronized()
-  {
-    flag->clear(std::memory_order_release);
-  }
 
-  operator bool() const { return true; }
-private:
-  std::atomic_flag* flag;
-};
+// An overload of the 'synchronize' function for 'std::atomic_flag'.
+inline Synchronized<std::atomic_flag> synchronize(std::atomic_flag* lock)
+{
+  return Synchronized<std::atomic_flag>(
+    lock,
+    [](std::atomic_flag* lock) {
+      while (lock->test_and_set(std::memory_order_acquire)) {}
+    },
+    [](std::atomic_flag* lock) {
+      lock->clear(std::memory_order_release);
+    }
+  );
+}
+
+
+// An overload of the 'synchronize' function for 'pthread_mutex_t'.
+inline Synchronized<pthread_mutex_t> synchronize(pthread_mutex_t* mutex)
+{
+  return Synchronized<pthread_mutex_t>(
+    mutex,
+    [](pthread_mutex_t* mutex) {
+      pthread_mutex_lock(mutex);
+    },
+    [](pthread_mutex_t* mutex) {
+      pthread_mutex_unlock(mutex);
+    }
+  );
+}
+
+
+template <typename T>
+T* synchronized_get_pointer(T** t)
+{
+  return *CHECK_NOTNULL(t);
+}
+
+
+template <typename T>
+T* synchronized_get_pointer(T* t)
+{
+  return t;
+}
+
+
+// Macros to help generate "unique" identifiers for the
+// synchronization variable name and label. The line number gets
+// embedded which makes it unique enough, but not absolutely unique.
+// It shouldn't be a problem however, since it's very unlikely that
+// anyone would place multiple 'synchronized' blocks on one line.
+#define SYNCHRONIZED_PREFIX CAT(__synchronizer_, __LINE__)
+#define SYNCHRONIZED_VAR CAT(SYNCHRONIZED_PREFIX, _var__)
+#define SYNCHRONIZED_LABEL CAT(SYNCHRONIZED_PREFIX, _label__)
 
 
 // A macro for acquiring a scoped 'guard' on any type that can satisfy
-// the 'Synchronized' interface. Currently this includes 'std::mutex',
-// 'std::recursive_mutex' and 'std::atomic_flag'.
-// Example:
-//   std::mutex m;
-//   synchronized (m) {
-//     // Do something under the lock.
-//   }
-#define synchronized(m)                                                 \
-  if (auto __ ## __file__ ## _ ## __line__ ## __lock = Synchronized<typename std::remove_pointer<decltype(m)>::type>(&m)) // NOLINT(whitespace/line_length)
+// the 'Synchronized' interface. We support 'std::mutex',
+// 'std::recursive_mutex' and 'std::atomic_flag' by default.
+//
+//   Example usage:
+//     std::mutex m;
+//     synchronized (m) {
+//       // Do something under the lock.
+//     }
+//
+// You can easily extend support for your synchronization primitive
+// here, by overloading the 'synchronize' function.
+//
+//   Example overload:
+//     inline Synchronized<bufferevent> synchronize(bufferevent* bev)
+//     {
+//       return Synchronized<bufferevent>(
+//         bev,
+//         [](bufferevent* bev) { bufferevent_lock(bev); },
+//         [](bufferevent* bev) { bufferevent_unlock(bev); },
+//       );
+//     }
+//
+// How it works: An instance of the synchronization primitive is
+// constructed inside the 'condition' of if statement. This variable
+// stays alive for the lifetime of the block. The trick with 'goto',
+// 'else' and the label allows the compiler to figure out that the
+// synchronized block will always execute. This allows us to return
+// within the synchronized block and avoid the
+// 'control reaches the end of a non-void function' warning.
+// Note that the variable declared inside the 'condition' of an if
+// statement is guaranteed to live through the 'else' clause as well.
+//
+// From Section 6.4.3:
+//   A name introduced by a declaration in a condition (either
+//   introduced by the decl-specifier-seq or the declarator of the
+//   condition) is in scope from its point of declaration until the
+//   end of the substatements controlled by the condition.
+#define synchronized(m)                                                     \
+  if (Synchronized<typename std::remove_pointer<decltype(m)>::type>         \
+        SYNCHRONIZED_VAR = ::synchronize(::synchronized_get_pointer(&m))) { \
+    goto SYNCHRONIZED_LABEL;                                                \
+  } else SYNCHRONIZED_LABEL:
 
 #endif // __STOUT_SYNCHRONIZED_HPP__


[03/21] mesos git commit: Update libprocess Once to use synchronized.

Posted by be...@apache.org.
Update libprocess Once to use synchronized.

Review: https://reviews.apache.org/r/35093


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/149f42fd
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/149f42fd
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/149f42fd

Branch: refs/heads/master
Commit: 149f42fde28bf6eabb3fef0eea6eb86b85ca53e7
Parents: dc17126
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Jun 13 06:14:50 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Jun 14 02:43:00 2015 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/once.hpp | 9 +++------
 1 file changed, 3 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/149f42fd/3rdparty/libprocess/include/process/once.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/once.hpp b/3rdparty/libprocess/include/process/once.hpp
index 256ed07..2b81df3 100644
--- a/3rdparty/libprocess/include/process/once.hpp
+++ b/3rdparty/libprocess/include/process/once.hpp
@@ -4,6 +4,7 @@
 #include <process/future.hpp>
 
 #include <stout/nothing.hpp>
+#include <stout/synchronized.hpp>
 
 namespace process {
 
@@ -32,8 +33,7 @@ public:
   {
     bool result = false;
 
-    pthread_mutex_lock(&mutex);
-    {
+    synchronized (mutex) {
       if (started) {
         while (!finished) {
           pthread_cond_wait(&cond, &mutex);
@@ -43,7 +43,6 @@ public:
         started = true;
       }
     }
-    pthread_mutex_unlock(&mutex);
 
     return result;
   }
@@ -51,14 +50,12 @@ public:
   // Transitions this Once instance to a 'done' state.
   void done()
   {
-    pthread_mutex_lock(&mutex);
-    {
+    synchronized (mutex) {
       if (started && !finished) {
         finished = true;
         pthread_cond_broadcast(&cond);
       }
     }
-    pthread_mutex_unlock(&mutex);
   }
 
 private:


[07/21] mesos git commit: Rebase master_contender_detector_tests with synchronized.

Posted by be...@apache.org.
Rebase master_contender_detector_tests with synchronized.

Review: https://reviews.apache.org/r/35092


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/dc17126a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/dc17126a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/dc17126a

Branch: refs/heads/master
Commit: dc17126a46f2e0db17ccdf84ce0d65683d7929a5
Parents: e1a13ad
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Jun 13 06:10:49 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Jun 14 02:43:00 2015 -0700

----------------------------------------------------------------------
 src/tests/master_contender_detector_tests.cpp | 18 ++++++++++--------
 1 file changed, 10 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/dc17126a/src/tests/master_contender_detector_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_contender_detector_tests.cpp b/src/tests/master_contender_detector_tests.cpp
index af6f15a..1b30eeb 100644
--- a/src/tests/master_contender_detector_tests.cpp
+++ b/src/tests/master_contender_detector_tests.cpp
@@ -282,14 +282,16 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, ContenderPendingElection)
 
   process::TestsFilter* filter =
     process::FilterTestEventListener::instance()->install();
-  pthread_mutex_lock(&filter->mutex);
-
-  // Expect GroupProcess::join not getting called because
-  // ZooKeeperMasterContender directly returns.
-  EXPECT_CALL(filter->mock, filter(testing::A<const process::DispatchEvent&>()))
-    .With(DispatchMatcher(_, &GroupProcess::join))
-    .Times(0);
-  pthread_mutex_unlock(&filter->mutex);
+
+  synchronized (filter->mutex) {
+    // Expect GroupProcess::join not getting called because
+    // ZooKeeperMasterContender directly returns.
+    EXPECT_CALL(
+        filter->mock,
+        filter(testing::A<const process::DispatchEvent&>()))
+      .With(DispatchMatcher(_, &GroupProcess::join))
+      .Times(0);
+  }
 
   // Recontend and settle so that if ZooKeeperMasterContender is not
   // directly returning, GroupProcess::join is dispatched.


[20/21] mesos git commit: Update HookManager to use synchronized.

Posted by be...@apache.org.
Update HookManager to use synchronized.

Review: https://reviews.apache.org/r/35100


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/5a69aa74
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/5a69aa74
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/5a69aa74

Branch: refs/heads/master
Commit: 5a69aa74d5d464f53629d81b7200e67f818789b4
Parents: 51fca3f
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Jun 13 07:19:31 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Jun 14 02:43:01 2015 -0700

----------------------------------------------------------------------
 src/hook/manager.cpp | 172 ++++++++++++++++++++++++----------------------
 1 file changed, 91 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/5a69aa74/src/hook/manager.cpp
----------------------------------------------------------------------
diff --git a/src/hook/manager.cpp b/src/hook/manager.cpp
index 54b0d34..5236035 100644
--- a/src/hook/manager.cpp
+++ b/src/hook/manager.cpp
@@ -16,8 +16,7 @@
  * limitations under the License.
  */
 
-#include <pthread.h>
-
+#include <mutex>
 #include <string>
 #include <vector>
 
@@ -31,7 +30,6 @@
 #include <stout/strings.hpp>
 #include <stout/try.hpp>
 
-#include "common/lock.hpp"
 #include "hook/manager.hpp"
 #include "module/manager.hpp"
 
@@ -43,49 +41,52 @@ using mesos::modules::ModuleManager;
 namespace mesos {
 namespace internal {
 
-static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
+static std::mutex mutex;
 static hashmap<string, Hook*> availableHooks;
 
 
 Try<Nothing> HookManager::initialize(const string& hookList)
 {
-  Lock lock(&mutex);
-
-  const vector<string> hooks = strings::split(hookList, ",");
-  foreach (const string& hook, hooks) {
-    if (availableHooks.contains(hook)) {
-      return Error("Hook module '" + hook + "' already loaded");
-    }
-
-    if (!ModuleManager::contains<Hook>(hook)) {
-      return Error("No hook module named '" + hook + "' available");
-    }
-
-    // Let's create an instance of the hook module.
-    Try<Hook*> module = ModuleManager::create<Hook>(hook);
-    if (module.isError()) {
-      return Error(
-          "Failed to instantiate hook module '" + hook + "': " +
-          module.error());
+  synchronized (mutex) {
+    const vector<string> hooks = strings::split(hookList, ",");
+    foreach (const string& hook, hooks) {
+      if (availableHooks.contains(hook)) {
+        return Error("Hook module '" + hook + "' already loaded");
+      }
+
+      if (!ModuleManager::contains<Hook>(hook)) {
+        return Error("No hook module named '" + hook + "' available");
+      }
+
+      // Let's create an instance of the hook module.
+      Try<Hook*> module = ModuleManager::create<Hook>(hook);
+      if (module.isError()) {
+        return Error(
+            "Failed to instantiate hook module '" + hook + "': " +
+            module.error());
+      }
+
+      // Add the hook module to the list of available hooks.
+      availableHooks[hook] = module.get();
     }
-
-    // Add the hook module to the list of available hooks.
-    availableHooks[hook] = module.get();
   }
+
   return Nothing();
 }
 
 
 Try<Nothing> HookManager::unload(const std::string& hookName)
 {
-  Lock lock(&mutex);
-  if (!availableHooks.contains(hookName)) {
-    return Error(
-        "Error unloading hook module '" + hookName + "': module not loaded");
+  synchronized (mutex) {
+    if (!availableHooks.contains(hookName)) {
+      return Error(
+          "Error unloading hook module '" + hookName + "': module not loaded");
+    }
+
+    // Now remove the hook from the list of available hooks.
+    availableHooks.erase(hookName);
   }
 
-  // Now remove the hook from the list of available hooks.
-  availableHooks.erase(hookName);
   return Nothing();
 }
 
@@ -95,28 +96,33 @@ Labels HookManager::masterLaunchTaskLabelDecorator(
     const FrameworkInfo& frameworkInfo,
     const SlaveInfo& slaveInfo)
 {
-  Lock lock(&mutex);
-
-  // We need a mutable copy of the task info and set the new
-  // labels after each hook invocation. Otherwise, the last hook
-  // will be the only effective hook setting the labels.
-  TaskInfo taskInfo_ = taskInfo;
-
-  foreachpair (const string& name, Hook* hook, availableHooks) {
-    const Result<Labels>& result =
-      hook->masterLaunchTaskLabelDecorator(taskInfo_, frameworkInfo, slaveInfo);
-
-    // NOTE: If the hook returns None(), the task labels won't be
-    // changed.
-    if (result.isSome()) {
-      taskInfo_.mutable_labels()->CopyFrom(result.get());
-    } else if (result.isError()) {
-      LOG(WARNING) << "Master label decorator hook failed for module '"
-                   << name << "': " << result.error();
+  synchronized (mutex) {
+    // We need a mutable copy of the task info and set the new
+    // labels after each hook invocation. Otherwise, the last hook
+    // will be the only effective hook setting the labels.
+    TaskInfo taskInfo_ = taskInfo;
+
+    foreachpair (const string& name, Hook* hook, availableHooks) {
+      const Result<Labels>& result =
+        hook->masterLaunchTaskLabelDecorator(
+            taskInfo_,
+            frameworkInfo,
+            slaveInfo);
+
+      // NOTE: If the hook returns None(), the task labels won't be
+      // changed.
+      if (result.isSome()) {
+        taskInfo_.mutable_labels()->CopyFrom(result.get());
+      } else if (result.isError()) {
+        LOG(WARNING) << "Master label decorator hook failed for module '"
+                    << name << "': " << result.error();
+      }
     }
+
+    return taskInfo_.labels();
   }
 
-  return taskInfo_.labels();
+  UNREACHABLE();
 }
 
 
@@ -125,49 +131,53 @@ Labels HookManager::slaveRunTaskLabelDecorator(
     const FrameworkInfo& frameworkInfo,
     const SlaveInfo& slaveInfo)
 {
-  Lock lock(&mutex);
-
-  TaskInfo taskInfo_ = taskInfo;
-
-  foreachpair (const string& name, Hook* hook, availableHooks) {
-    const Result<Labels>& result =
-      hook->slaveRunTaskLabelDecorator(taskInfo_, frameworkInfo, slaveInfo);
-
-    // NOTE: If the hook returns None(), the task labels won't be
-    // changed.
-    if (result.isSome()) {
-      taskInfo_.mutable_labels()->CopyFrom(result.get());
-    } else if (result.isError()) {
-      LOG(WARNING) << "Slave label decorator hook failed for module '"
-                   << name << "': " << result.error();
+  synchronized (mutex) {
+    TaskInfo taskInfo_ = taskInfo;
+
+    foreachpair (const string& name, Hook* hook, availableHooks) {
+      const Result<Labels>& result =
+        hook->slaveRunTaskLabelDecorator(taskInfo_, frameworkInfo, slaveInfo);
+
+      // NOTE: If the hook returns None(), the task labels won't be
+      // changed.
+      if (result.isSome()) {
+        taskInfo_.mutable_labels()->CopyFrom(result.get());
+      } else if (result.isError()) {
+        LOG(WARNING) << "Slave label decorator hook failed for module '"
+                    << name << "': " << result.error();
+      }
     }
+
+    return taskInfo_.labels();
   }
 
-  return taskInfo_.labels();
+  UNREACHABLE();
 }
 
 
 Environment HookManager::slaveExecutorEnvironmentDecorator(
     ExecutorInfo executorInfo)
 {
-  Lock lock(&mutex);
-
-  foreachpair (const string& name, Hook* hook, availableHooks) {
-    const Result<Environment>& result =
-      hook->slaveExecutorEnvironmentDecorator(executorInfo);
-
-    // NOTE: If the hook returns None(), the environment won't be
-    // changed.
-    if (result.isSome()) {
-      executorInfo.mutable_command()->mutable_environment()->CopyFrom(
-          result.get());
-    } else if (result.isError()) {
-      LOG(WARNING) << "Slave environment decorator hook failed for module '"
-                   << name << "': " << result.error();
+  synchronized (mutex) {
+    foreachpair (const string& name, Hook* hook, availableHooks) {
+      const Result<Environment>& result =
+        hook->slaveExecutorEnvironmentDecorator(executorInfo);
+
+      // NOTE: If the hook returns None(), the environment won't be
+      // changed.
+      if (result.isSome()) {
+        executorInfo.mutable_command()->mutable_environment()->CopyFrom(
+            result.get());
+      } else if (result.isError()) {
+        LOG(WARNING) << "Slave environment decorator hook failed for module '"
+                    << name << "': " << result.error();
+      }
     }
+
+    return executorInfo.command().environment();
   }
 
-  return executorInfo.command().environment();
+  UNREACHABLE();
 }
 
 


[17/21] mesos git commit: Remove src/common/lock. Use synchronized instead.

Posted by be...@apache.org.
Remove src/common/lock. Use synchronized instead.

Review: https://reviews.apache.org/r/35102


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ad378345
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ad378345
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ad378345

Branch: refs/heads/master
Commit: ad378345f7fe12d4c091d46d44f2485b09fba40e
Parents: f4aaa14
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Jun 13 07:24:11 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Jun 14 02:43:01 2015 -0700

----------------------------------------------------------------------
 src/Makefile.am     |  2 --
 src/common/lock.cpp | 55 ------------------------------------------------
 src/common/lock.hpp | 45 ---------------------------------------
 3 files changed, 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/ad378345/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 3c44f01..884533e 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -353,7 +353,6 @@ libmesos_no_3rdparty_la_SOURCES =					\
 	common/attributes.cpp						\
 	common/date_utils.cpp						\
 	common/http.cpp							\
-	common/lock.cpp							\
 	common/protobuf_utils.cpp					\
 	common/resources.cpp						\
 	common/resources_utils.cpp					\
@@ -580,7 +579,6 @@ libmesos_no_3rdparty_la_SOURCES +=					\
 	common/date_utils.hpp						\
 	common/factory.hpp						\
 	common/http.hpp							\
-	common/lock.hpp							\
 	common/parse.hpp						\
 	common/protobuf_utils.hpp					\
 	common/resources_utils.hpp					\

http://git-wip-us.apache.org/repos/asf/mesos/blob/ad378345/src/common/lock.cpp
----------------------------------------------------------------------
diff --git a/src/common/lock.cpp b/src/common/lock.cpp
deleted file mode 100644
index bb8ea3a..0000000
--- a/src/common/lock.cpp
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "lock.hpp"
-
-namespace mesos {
-namespace internal {
-
-Lock::Lock(pthread_mutex_t* _mutex)
-  : mutex(_mutex), locked(false)
-{
-  lock();
-}
-
-
-void Lock::lock()
-{
-  if (!locked) {
-    pthread_mutex_lock(mutex);
-    locked = true;
-  }
-}
-
-
-void Lock::unlock()
-{
-  if (locked) {
-    locked = false;
-    pthread_mutex_unlock(mutex);
-  }
-}
-
-
-Lock::~Lock()
-{
-  unlock();
-}
-
-} // namespace internal {
-} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/ad378345/src/common/lock.hpp
----------------------------------------------------------------------
diff --git a/src/common/lock.hpp b/src/common/lock.hpp
deleted file mode 100644
index 988dff5..0000000
--- a/src/common/lock.hpp
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef __LOCK_HPP__
-#define __LOCK_HPP__
-
-#include <pthread.h>
-
-namespace mesos {
-namespace internal {
-
-// RAII class for locking pthread_mutexes.
-class Lock
-{
-public:
-  explicit Lock(pthread_mutex_t* _mutex);
-  ~Lock();
-
-  void lock();
-  void unlock();
-
-private:
-  pthread_mutex_t* mutex;
-  bool locked;
-};
-
-} // namespace internal {
-} // namespace mesos {
-
-#endif // __LOCK_HPP__


[14/21] mesos git commit: Update cram_md5 to use synchronized.

Posted by be...@apache.org.
Update cram_md5 to use synchronized.

Review: https://reviews.apache.org/r/35099


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/51fca3f7
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/51fca3f7
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/51fca3f7

Branch: refs/heads/master
Commit: 51fca3f7efa0fd8abd11c22d87092b13f2ba0318
Parents: 98039c9
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Jun 13 07:18:56 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Jun 14 02:43:01 2015 -0700

----------------------------------------------------------------------
 src/authentication/cram_md5/auxprop.cpp |  5 +++--
 src/authentication/cram_md5/auxprop.hpp | 25 +++++++++++++------------
 2 files changed, 16 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/51fca3f7/src/authentication/cram_md5/auxprop.cpp
----------------------------------------------------------------------
diff --git a/src/authentication/cram_md5/auxprop.cpp b/src/authentication/cram_md5/auxprop.cpp
index 984ed90..abf0f8d 100644
--- a/src/authentication/cram_md5/auxprop.cpp
+++ b/src/authentication/cram_md5/auxprop.cpp
@@ -18,6 +18,8 @@
 
 #include "authentication/cram_md5/auxprop.hpp"
 
+#include <mutex>
+
 #include "logging/logging.hpp"
 
 using std::list;
@@ -30,8 +32,7 @@ namespace cram_md5 {
 // Storage for the static members.
 Multimap<string, Property> InMemoryAuxiliaryPropertyPlugin::properties;
 sasl_auxprop_plug_t InMemoryAuxiliaryPropertyPlugin::plugin;
-pthread_mutex_t InMemoryAuxiliaryPropertyPlugin::mutex =
-  PTHREAD_MUTEX_INITIALIZER;
+std::mutex InMemoryAuxiliaryPropertyPlugin::mutex;
 
 
 int InMemoryAuxiliaryPropertyPlugin::initialize(

http://git-wip-us.apache.org/repos/asf/mesos/blob/51fca3f7/src/authentication/cram_md5/auxprop.hpp
----------------------------------------------------------------------
diff --git a/src/authentication/cram_md5/auxprop.hpp b/src/authentication/cram_md5/auxprop.hpp
index 1a054ba..0fa87f4 100644
--- a/src/authentication/cram_md5/auxprop.hpp
+++ b/src/authentication/cram_md5/auxprop.hpp
@@ -19,8 +19,7 @@
 #ifndef __AUTHENTICATION_CRAM_MD5_AUXPROP_HPP__
 #define __AUTHENTICATION_CRAM_MD5_AUXPROP_HPP__
 
-#include <pthread.h>
-
+#include <mutex>
 #include <string>
 
 #include <sasl/sasl.h>
@@ -30,8 +29,7 @@
 #include <stout/multimap.hpp>
 #include <stout/none.hpp>
 #include <stout/option.hpp>
-
-#include "common/lock.hpp"
+#include <stout/synchronized.hpp>
 
 namespace mesos {
 namespace internal {
@@ -51,22 +49,25 @@ public:
 
   static void load(const Multimap<std::string, Property>& _properties)
   {
-    Lock lock(&mutex);
-    properties = _properties;
+    synchronized (mutex) {
+      properties = _properties;
+    }
   }
 
   static Option<std::list<std::string>> lookup(
       const std::string& user,
       const std::string& name)
   {
-    Lock lock(&mutex);
-    if (properties.contains(user)) {
-      foreach (const Property& property, properties.get(user)) {
-        if (property.name == name) {
-          return property.values;
+    synchronized (mutex) {
+      if (properties.contains(user)) {
+        foreach (const Property& property, properties.get(user)) {
+          if (property.name == name) {
+            return property.values;
+          }
         }
       }
     }
+
     return None();
   }
 
@@ -98,7 +99,7 @@ private:
 
   // Access to 'properties' has to be protected as multiple
   // authenticator instances may be active concurrently.
-  static pthread_mutex_t mutex;
+  static std::mutex mutex;
 };
 
 } // namespace cram_md5 {


[06/21] mesos git commit: libprocess: Rebased for _CheckFatal changes in stout.

Posted by be...@apache.org.
libprocess: Rebased for _CheckFatal changes in stout.

Review: https://reviews.apache.org/r/35423


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c683b00b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c683b00b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c683b00b

Branch: refs/heads/master
Commit: c683b00bc7a01099f0ef614606b053e9b4fbb94f
Parents: 5f7c5a7
Author: Michael Park <mc...@gmail.com>
Authored: Sat Jun 13 05:25:03 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Jun 14 02:43:00 2015 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/check.hpp | 67 ++++++++++------------
 1 file changed, 29 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/c683b00b/3rdparty/libprocess/include/process/check.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/check.hpp b/3rdparty/libprocess/include/process/check.hpp
index 60989ac..4b43b70 100644
--- a/3rdparty/libprocess/include/process/check.hpp
+++ b/3rdparty/libprocess/include/process/check.hpp
@@ -25,79 +25,69 @@
 // This appends the error if possible to the end of the log message, so there's
 // no need to append the error message explicitly.
 #define CHECK_PENDING(expression)                                       \
-  for (const Option<std::string> _error = _checkPending(expression);    \
-       _error.isSome();)                                                \
-    _CheckFatal(__FILE__, __LINE__, "CHECK_PENDING",                    \
-                #expression, _error.get()).stream()
+  CHECK_STATE(CHECK_PENDING, _check_pending, expression)
 
 #define CHECK_READY(expression)                                         \
-  for (const Option<std::string> _error = _checkReady(expression);      \
-       _error.isSome();)                                                \
-    _CheckFatal(__FILE__, __LINE__, "CHECK_READY",                      \
-                #expression, _error.get()).stream()
+  CHECK_STATE(CHECK_READY, _check_ready, expression)
 
 #define CHECK_DISCARDED(expression)                                     \
-  for (const Option<std::string> _error = _checkDiscarded(expression);  \
-       _error.isSome();)                                                \
-    _CheckFatal(__FILE__, __LINE__, "CHECK_DISCARDED",                  \
-                #expression, _error.get()).stream()
+  CHECK_STATE(CHECK_DISCARDED, _check_discarded, expression)
 
 #define CHECK_FAILED(expression)                                        \
-  for (const Option<std::string> _error = _checkFailed(expression);     \
-       _error.isSome();)                                                \
-    _CheckFatal(__FILE__, __LINE__, "CHECK_FAILED",                     \
-                #expression, _error.get()).stream()
-
+  CHECK_STATE(CHECK_FAILED, _check_failed, expression)
 
 // Private structs/functions used for CHECK_*.
 
 template <typename T>
-Option<std::string> _checkPending(const process::Future<T>& f)
+Option<Error> _check_pending(const process::Future<T>& f)
 {
   if (f.isReady()) {
-    return Some("is READY");
+    return Error("is READY");
   } else if (f.isDiscarded()) {
-    return Some("is DISCARDED");
+    return Error("is DISCARDED");
   } else if (f.isFailed()) {
-    return Some("is FAILED: " + f.failure());
+    return Error("is FAILED: " + f.failure());
+  } else {
+    CHECK(f.isPending());
+    return None();
   }
-  CHECK(f.isPending());
-  return None();
 }
 
 
 template <typename T>
-Option<std::string> _checkReady(const process::Future<T>& f)
+Option<Error> _check_ready(const process::Future<T>& f)
 {
   if (f.isPending()) {
-    return Some("is PENDING");
+    return Error("is PENDING");
   } else if (f.isDiscarded()) {
-    return Some("is DISCARDED");
+    return Error("is DISCARDED");
   } else if (f.isFailed()) {
-    return Some("is FAILED: " + f.failure());
+    return Error("is FAILED: " + f.failure());
+  } else {
+    CHECK(f.isReady());
+    return None();
   }
-  CHECK(f.isReady());
-  return None();
 }
 
 
 template <typename T>
-Option<std::string> _checkDiscarded(const process::Future<T>& f)
+Option<Error> _check_discarded(const process::Future<T>& f)
 {
   if (f.isPending()) {
-    return Some("is PENDING");
+    return Error("is PENDING");
   } else if (f.isReady()) {
-    return Some("is READY");
+    return Error("is READY");
   } else if (f.isFailed()) {
-    return Some("is FAILED: " + f.failure());
+    return Error("is FAILED: " + f.failure());
+  } else {
+    CHECK(f.isDiscarded());
+    return None();
   }
-  CHECK(f.isDiscarded());
-  return None();
 }
 
 
 template <typename T>
-Option<std::string> _checkFailed(const process::Future<T>& f)
+Option<Error> _check_failed(const process::Future<T>& f)
 {
   if (f.isPending()) {
     return Some("is PENDING");
@@ -105,9 +95,10 @@ Option<std::string> _checkFailed(const process::Future<T>& f)
     return Some("is READY");
   } else if (f.isDiscarded()) {
     return Some("is DISCARDED");
+  } else {
+    CHECK(f.isFailed());
+    return None();
   }
-  CHECK(f.isFailed());
-  return None();
 }
 
 // TODO(dhamon): CHECK_NPENDING, CHECK_NREADY, etc.


[12/21] mesos git commit: libprocess: Use CHECK_NONE.

Posted by be...@apache.org.
libprocess: Use CHECK_NONE.

Used `grep -r "CHECK([^\!].*\.isNone())" .` to find the instances that
look like `CHECK(x.isNone());`

Review: https://reviews.apache.org/r/35425


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/98236481
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/98236481
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/98236481

Branch: refs/heads/master
Commit: 9823648125a5fe560f9c278ee0a1239d7a8eabfb
Parents: ad37834
Author: Michael Park <mc...@gmail.com>
Authored: Sat Jun 13 07:28:48 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Jun 14 02:43:01 2015 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/src/decoder.hpp | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/98236481/3rdparty/libprocess/src/decoder.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/decoder.hpp b/3rdparty/libprocess/src/decoder.hpp
index 56adde0..85ce9e3 100644
--- a/3rdparty/libprocess/src/decoder.hpp
+++ b/3rdparty/libprocess/src/decoder.hpp
@@ -555,7 +555,7 @@ private:
     decoder->value.clear();
 
     CHECK(decoder->response == NULL);
-    CHECK(decoder->writer.isNone());
+    CHECK_NONE(decoder->writer);
 
     decoder->response = new http::Response();
     decoder->response->type = http::Response::PIPE;
@@ -642,7 +642,7 @@ private:
       return 1;
     }
 
-    CHECK(decoder->writer.isNone());
+    CHECK_NONE(decoder->writer);
 
     http::Pipe pipe;
     decoder->writer = pipe.writer();


[21/21] mesos git commit: Update ModuleManager to use synchronized.

Posted by be...@apache.org.
Update ModuleManager to use synchronized.

Review: https://reviews.apache.org/r/35098


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/98039c99
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/98039c99
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/98039c99

Branch: refs/heads/master
Commit: 98039c99b0a2e0a36a7d4c22e672ecd817923bb4
Parents: 8939609
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Jun 13 07:15:59 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Jun 14 02:43:01 2015 -0700

----------------------------------------------------------------------
 src/module/manager.cpp | 129 ++++++++++++++++++++++----------------------
 src/module/manager.hpp |  71 ++++++++++++------------
 2 files changed, 101 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/98039c99/src/module/manager.cpp
----------------------------------------------------------------------
diff --git a/src/module/manager.cpp b/src/module/manager.cpp
index 2c7f876..909ca56 100644
--- a/src/module/manager.cpp
+++ b/src/module/manager.cpp
@@ -16,6 +16,7 @@
  * limitations under the License.
  */
 
+#include <mutex>
 #include <string>
 #include <vector>
 
@@ -31,8 +32,6 @@
 #include <stout/stringify.hpp>
 #include <stout/version.hpp>
 
-#include "common/lock.hpp"
-
 #include "messages/messages.hpp"
 
 #include "module/manager.hpp"
@@ -46,7 +45,7 @@ using namespace mesos;
 using namespace mesos::internal;
 using namespace mesos::modules;
 
-pthread_mutex_t ModuleManager::mutex = PTHREAD_MUTEX_INITIALIZER;
+std::mutex ModuleManager::mutex;
 hashmap<const string, string> ModuleManager::kindToVersion;
 hashmap<const string, ModuleBase*> ModuleManager::moduleBases;
 hashmap<const string, Owned<DynamicLibrary>> ModuleManager::dynamicLibraries;
@@ -104,15 +103,16 @@ void ModuleManager::initialize()
 // of ModuleBases.
 Try<Nothing> ModuleManager::unload(const string& moduleName)
 {
-  Lock lock(&mutex);
-  if (!moduleBases.contains(moduleName)) {
-    return Error(
-        "Error unloading module '" + moduleName + "': module not loaded");
-  }
+  synchronized (mutex) {
+    if (!moduleBases.contains(moduleName)) {
+      return Error(
+          "Error unloading module '" + moduleName + "': module not loaded");
+    }
 
-  // Do not remove the dynamiclibrary as it could result in unloading
-  // the library from the process memory.
-  moduleBases.erase(moduleName);
+    // Do not remove the dynamiclibrary as it could result in
+    // unloading the library from the process memory.
+    moduleBases.erase(moduleName);
+  }
   return Nothing();
 }
 
@@ -189,64 +189,67 @@ Try<Nothing> ModuleManager::verifyModule(
 
 Try<Nothing> ModuleManager::load(const Modules& modules)
 {
-  Lock lock(&mutex);
-  initialize();
-
-  foreach (const Modules::Library& library, modules.libraries()) {
-    string libraryName;
-    if (library.has_file()) {
-      libraryName = library.file();
-    } else if (library.has_name()) {
-      libraryName = os::libraries::expandName(library.name());
-    } else {
-      return Error("Library name or path not provided");
-    }
-
-    if (!dynamicLibraries.contains(libraryName)) {
-      Owned<DynamicLibrary> dynamicLibrary(new DynamicLibrary());
-      Try<Nothing> result = dynamicLibrary->open(libraryName);
-      if (!result.isSome()) {
-        return Error(
-            "Error opening library: '" + libraryName + "': " + result.error());
+  synchronized (mutex) {
+    initialize();
+
+    foreach (const Modules::Library& library, modules.libraries()) {
+      string libraryName;
+      if (library.has_file()) {
+        libraryName = library.file();
+      } else if (library.has_name()) {
+        libraryName = os::libraries::expandName(library.name());
+      } else {
+        return Error("Library name or path not provided");
       }
 
-      dynamicLibraries[libraryName] = dynamicLibrary;
-    }
-
-    // Load module manifests.
-    foreach (const Modules::Library::Module& module, library.modules()) {
-      if (!module.has_name()) {
-        return Error(
-            "Error: module name not provided with library '" + libraryName +
-            "'");
-      }
+      if (!dynamicLibraries.contains(libraryName)) {
+        Owned<DynamicLibrary> dynamicLibrary(new DynamicLibrary());
+        Try<Nothing> result = dynamicLibrary->open(libraryName);
+        if (!result.isSome()) {
+          return Error(
+              "Error opening library: '" + libraryName +
+              "': " + result.error());
+        }
 
-      // Check for possible duplicate module names.
-      const std::string moduleName = module.name();
-      if (moduleBases.contains(moduleName)) {
-        return Error("Error loading duplicate module '" + moduleName + "'");
+        dynamicLibraries[libraryName] = dynamicLibrary;
       }
 
-      // Load ModuleBase.
-      Try<void*> symbol = dynamicLibraries[libraryName]->loadSymbol(moduleName);
-      if (symbol.isError()) {
-        return Error(
-            "Error loading module '" + moduleName + "': " + symbol.error());
+      // Load module manifests.
+      foreach (const Modules::Library::Module& module, library.modules()) {
+        if (!module.has_name()) {
+          return Error(
+              "Error: module name not provided with library '" + libraryName +
+              "'");
+        }
+
+        // Check for possible duplicate module names.
+        const std::string moduleName = module.name();
+        if (moduleBases.contains(moduleName)) {
+          return Error("Error loading duplicate module '" + moduleName + "'");
+        }
+
+        // Load ModuleBase.
+        Try<void*> symbol =
+          dynamicLibraries[libraryName]->loadSymbol(moduleName);
+        if (symbol.isError()) {
+          return Error(
+              "Error loading module '" + moduleName + "': " + symbol.error());
+        }
+        ModuleBase* moduleBase = (ModuleBase*) symbol.get();
+
+        // Verify module compatibility including version, etc.
+        Try<Nothing> result = verifyModule(moduleName, moduleBase);
+        if (result.isError()) {
+          return Error(
+              "Error verifying module '" + moduleName + "': " + result.error());
+        }
+
+        moduleBases[moduleName] = (ModuleBase*) symbol.get();
+
+        // Now copy the supplied module-specific parameters.
+        moduleParameters[moduleName].mutable_parameter()->CopyFrom(
+            module.parameters());
       }
-      ModuleBase* moduleBase = (ModuleBase*) symbol.get();
-
-      // Verify module compatibility including version, etc.
-      Try<Nothing> result = verifyModule(moduleName, moduleBase);
-      if (result.isError()) {
-        return Error(
-            "Error verifying module '" + moduleName + "': " + result.error());
-      }
-
-      moduleBases[moduleName] = (ModuleBase*) symbol.get();
-
-      // Now copy the supplied module-specific parameters.
-      moduleParameters[moduleName].mutable_parameter()->CopyFrom(
-          module.parameters());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/98039c99/src/module/manager.hpp
----------------------------------------------------------------------
diff --git a/src/module/manager.hpp b/src/module/manager.hpp
index 4befb64..cab67a8 100644
--- a/src/module/manager.hpp
+++ b/src/module/manager.hpp
@@ -19,9 +19,8 @@
 #ifndef __MODULE_MANAGER_HPP__
 #define __MODULE_MANAGER_HPP__
 
-#include <pthread.h>
-
 #include <list>
+#include <mutex>
 #include <string>
 #include <vector>
 
@@ -37,8 +36,8 @@
 #include <stout/check.hpp>
 #include <stout/dynamiclibrary.hpp>
 #include <stout/hashmap.hpp>
+#include <stout/synchronized.hpp>
 
-#include "common/lock.hpp"
 #include "messages/messages.hpp"
 
 namespace mesos {
@@ -71,40 +70,42 @@ public:
   template <typename T>
   static Try<T*> create(const std::string& moduleName)
   {
-    mesos::internal::Lock lock(&mutex);
-    if (!moduleBases.contains(moduleName)) {
-      return Error(
-          "Module '" + moduleName + "' unknown");
-    }
+    synchronized (mutex) {
+      if (!moduleBases.contains(moduleName)) {
+        return Error(
+            "Module '" + moduleName + "' unknown");
+      }
 
-    Module<T>* module = (Module<T>*) moduleBases[moduleName];
-    if (module->create == NULL) {
-      return Error(
-          "Error creating module instance for '" + moduleName + "': "
-          "create() method not found");
-    }
+      Module<T>* module = (Module<T>*) moduleBases[moduleName];
+      if (module->create == NULL) {
+        return Error(
+            "Error creating module instance for '" + moduleName + "': "
+            "create() method not found");
+      }
 
-    std::string expectedKind = kind<T>();
-    if (expectedKind != module->kind) {
-      return Error(
-          "Error creating module instance for '" + moduleName + "': "
-          "module is of kind '" + module->kind + "', but the requested "
-          "kind is '" + expectedKind + "'");
-    }
+      std::string expectedKind = kind<T>();
+      if (expectedKind != module->kind) {
+        return Error(
+            "Error creating module instance for '" + moduleName + "': "
+            "module is of kind '" + module->kind + "', but the requested "
+            "kind is '" + expectedKind + "'");
+      }
 
-    T* instance = module->create(moduleParameters[moduleName]);
-    if (instance == NULL) {
-      return Error("Error creating Module instance for '" + moduleName + "'");
+      T* instance = module->create(moduleParameters[moduleName]);
+      if (instance == NULL) {
+        return Error("Error creating Module instance for '" + moduleName + "'");
+      }
+      return instance;
     }
-    return instance;
   }
 
   template <typename T>
   static bool contains(const std::string& moduleName)
   {
-    mesos::internal::Lock lock(&mutex);
-    return (moduleBases.contains(moduleName) &&
-            moduleBases[moduleName]->kind == stringify(kind<T>()));
+    synchronized (mutex) {
+      return (moduleBases.contains(moduleName) &&
+              moduleBases[moduleName]->kind == stringify(kind<T>()));
+    }
   }
 
   // Returns all module names that have been loaded that implement the
@@ -117,13 +118,13 @@ public:
   template <typename T>
   static std::vector<std::string> find()
   {
-    mesos::internal::Lock lock(&mutex);
-
     std::vector<std::string> names;
 
-    foreachpair (const std::string& name, ModuleBase* base, moduleBases) {
-      if (base->kind == stringify(kind<T>())) {
-        names.push_back(name);
+    synchronized (mutex) {
+      foreachpair (const std::string& name, ModuleBase* base, moduleBases) {
+        if (base->kind == stringify(kind<T>())) {
+          names.push_back(name);
+        }
       }
     }
 
@@ -141,9 +142,7 @@ private:
       const std::string& moduleName,
       const ModuleBase* moduleBase);
 
-  // TODO(karya): Replace pthread_mutex_t with std::mutex in
-  // common/lock.hpp and other places that refer to it.
-  static pthread_mutex_t mutex;
+  static std::mutex mutex;
 
   static hashmap<const std::string, std::string> kindToVersion;