You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2015/06/14 11:46:31 UTC

[13/21] mesos git commit: Update Mesos scheduler to use synchronized.

Update Mesos scheduler to use synchronized.

Review: https://reviews.apache.org/r/35096


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b2d80474
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b2d80474
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b2d80474

Branch: refs/heads/master
Commit: b2d8047428228cbbea65f4af889d11e8918e2e96
Parents: 5b0eeb0
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Jun 13 07:06:22 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Jun 14 02:43:01 2015 -0700

----------------------------------------------------------------------
 include/mesos/scheduler.hpp |   5 +-
 src/sched/sched.cpp         | 382 +++++++++++++++++++--------------------
 2 files changed, 193 insertions(+), 194 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/b2d80474/include/mesos/scheduler.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/scheduler.hpp b/include/mesos/scheduler.hpp
index 2ee6b5c..0b54ffe 100644
--- a/include/mesos/scheduler.hpp
+++ b/include/mesos/scheduler.hpp
@@ -19,11 +19,10 @@
 #ifndef __MESOS_SCHEDULER_HPP__
 #define __MESOS_SCHEDULER_HPP__
 
-#include <functional>
-#include <queue>
-
 #include <pthread.h>
 
+#include <functional>
+#include <queue>
 #include <string>
 #include <vector>
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/b2d80474/src/sched/sched.cpp
----------------------------------------------------------------------
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index 9423607..bc76c71 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -71,8 +71,6 @@
 
 #include "authentication/cram_md5/authenticatee.hpp"
 
-#include "common/lock.hpp"
-
 #include "local/flags.hpp"
 #include "local/local.hpp"
 
@@ -841,8 +839,9 @@ protected:
       send(master.get(), message);
     }
 
-    Lock lock(mutex);
-    pthread_cond_signal(cond);
+    synchronized (mutex) {
+      pthread_cond_signal(cond);
+    }
   }
 
   // NOTE: This function informs the master to stop attempting to send
@@ -866,8 +865,9 @@ protected:
       send(master.get(), message);
     }
 
-    Lock lock(mutex);
-    pthread_cond_signal(cond);
+    synchronized (mutex) {
+      pthread_cond_signal(cond);
+    }
   }
 
   void killTask(const TaskID& taskId)
@@ -1507,156 +1507,156 @@ MesosSchedulerDriver::~MesosSchedulerDriver()
 
 Status MesosSchedulerDriver::start()
 {
-  Lock lock(&mutex);
-
-  if (status != DRIVER_NOT_STARTED) {
-    return status;
-  }
-
-  if (detector == NULL) {
-    Try<MasterDetector*> detector_ = MasterDetector::create(url);
-
-    if (detector_.isError()) {
-      status = DRIVER_ABORTED;
-      string message = "Failed to create a master detector for '" +
-      master + "': " + detector_.error();
-      scheduler->error(this, message);
+  synchronized (mutex) {
+    if (status != DRIVER_NOT_STARTED) {
       return status;
     }
 
-    // Save the detector so we can delete it later.
-    detector = detector_.get();
-  }
+    if (detector == NULL) {
+      Try<MasterDetector*> detector_ = MasterDetector::create(url);
 
-  // Load scheduler flags.
-  internal::scheduler::Flags flags;
-  Try<Nothing> load = flags.load("MESOS_");
+      if (detector_.isError()) {
+        status = DRIVER_ABORTED;
+        string message = "Failed to create a master detector for '" +
+        master + "': " + detector_.error();
+        scheduler->error(this, message);
+        return status;
+      }
 
-  if (load.isError()) {
-    status = DRIVER_ABORTED;
-    scheduler->error(this, load.error());
-    return status;
-  }
+      // Save the detector so we can delete it later.
+      detector = detector_.get();
+    }
 
-  // Initialize modules. Note that since other subsystems may depend
-  // upon modules, we should initialize modules before anything else.
-  if (flags.modules.isSome()) {
-    Try<Nothing> result = modules::ModuleManager::load(flags.modules.get());
-    if (result.isError()) {
+    // Load scheduler flags.
+    internal::scheduler::Flags flags;
+    Try<Nothing> load = flags.load("MESOS_");
+
+    if (load.isError()) {
       status = DRIVER_ABORTED;
-      scheduler->error(this, "Error loading modules: " + result.error());
+      scheduler->error(this, load.error());
       return status;
     }
-  }
 
-  CHECK(process == NULL);
+    // Initialize modules. Note that since other subsystems may depend
+    // upon modules, we should initialize modules before anything else.
+    if (flags.modules.isSome()) {
+      Try<Nothing> result = modules::ModuleManager::load(flags.modules.get());
+      if (result.isError()) {
+        status = DRIVER_ABORTED;
+        scheduler->error(this, "Error loading modules: " + result.error());
+        return status;
+      }
+    }
 
-  if (credential == NULL) {
-    process = new SchedulerProcess(
-        this,
-        scheduler,
-        framework,
-        None(),
-        implicitAcknowlegements,
-        schedulerId,
-        detector,
-        flags,
-        &mutex,
-        &cond);
-  } else {
-    const Credential& cred = *credential;
-    process = new SchedulerProcess(
-        this,
-        scheduler,
-        framework,
-        cred,
-        implicitAcknowlegements,
-        schedulerId,
-        detector,
-        flags,
-        &mutex,
-        &cond);
+    CHECK(process == NULL);
+
+    if (credential == NULL) {
+      process = new SchedulerProcess(
+          this,
+          scheduler,
+          framework,
+          None(),
+          implicitAcknowlegements,
+          schedulerId,
+          detector,
+          flags,
+          &mutex,
+          &cond);
+    } else {
+      const Credential& cred = *credential;
+      process = new SchedulerProcess(
+          this,
+          scheduler,
+          framework,
+          cred,
+          implicitAcknowlegements,
+          schedulerId,
+          detector,
+          flags,
+          &mutex,
+          &cond);
+    }
+
+    spawn(process);
+
+    return status = DRIVER_RUNNING;
   }
-
-  spawn(process);
-
-  return status = DRIVER_RUNNING;
 }
 
 
 Status MesosSchedulerDriver::stop(bool failover)
 {
-  Lock lock(&mutex);
-
-  LOG(INFO) << "Asked to stop the driver";
+  synchronized (mutex) {
+    LOG(INFO) << "Asked to stop the driver";
 
-  if (status != DRIVER_RUNNING && status != DRIVER_ABORTED) {
-    VLOG(1) << "Ignoring stop because the status of the driver is "
-            << Status_Name(status);
-    return status;
-  }
+    if (status != DRIVER_RUNNING && status != DRIVER_ABORTED) {
+      VLOG(1) << "Ignoring stop because the status of the driver is "
+              << Status_Name(status);
+      return status;
+    }
 
-  // 'process' might be NULL if the driver has failed to instantiate
-  // it due to bad parameters (e.g. error in creating the detector
-  // or loading flags).
-  if (process != NULL) {
-    process->running =  false;
-    dispatch(process, &SchedulerProcess::stop, failover);
-  }
+    // 'process' might be NULL if the driver has failed to instantiate
+    // it due to bad parameters (e.g. error in creating the detector
+    // or loading flags).
+    if (process != NULL) {
+      process->running =  false;
+      dispatch(process, &SchedulerProcess::stop, failover);
+    }
 
-  // TODO(benh): It might make more sense to clean up our local
-  // cluster here than in the destructor. However, what would be even
-  // better is to allow multiple local clusters to exist (i.e. not use
-  // global vars in local.cpp) so that ours can just be an instance
-  // variable in MesosSchedulerDriver.
+    // TODO(benh): It might make more sense to clean up our local
+    // cluster here than in the destructor. However, what would be
+    // even better is to allow multiple local clusters to exist (i.e.
+    // not use global vars in local.cpp) so that ours can just be an
+    // instance variable in MesosSchedulerDriver.
 
-  bool aborted = status == DRIVER_ABORTED;
+    bool aborted = status == DRIVER_ABORTED;
 
-  status = DRIVER_STOPPED;
+    status = DRIVER_STOPPED;
 
-  return aborted ? DRIVER_ABORTED : status;
+    return aborted ? DRIVER_ABORTED : status;
+  }
 }
 
 
 Status MesosSchedulerDriver::abort()
 {
-  Lock lock(&mutex);
-
-  LOG(INFO) << "Asked to abort the driver";
+  synchronized (mutex) {
+    LOG(INFO) << "Asked to abort the driver";
 
-  if (status != DRIVER_RUNNING) {
-    VLOG(1) << "Ignoring abort because the status of the driver is "
-            << Status_Name(status);
-    return status;
-  }
+    if (status != DRIVER_RUNNING) {
+      VLOG(1) << "Ignoring abort because the status of the driver is "
+              << Status_Name(status);
+      return status;
+    }
 
-  CHECK_NOTNULL(process);
-  process->running = false;
+    CHECK_NOTNULL(process);
+    process->running = false;
 
-  // Dispatching here ensures that we still process the outstanding
-  // requests *from* the scheduler, since those do proceed when
-  // aborted is true.
-  dispatch(process, &SchedulerProcess::abort);
+    // Dispatching here ensures that we still process the outstanding
+    // requests *from* the scheduler, since those do proceed when
+    // aborted is true.
+    dispatch(process, &SchedulerProcess::abort);
 
-  return status = DRIVER_ABORTED;
+    return status = DRIVER_ABORTED;
+  }
 }
 
 
 Status MesosSchedulerDriver::join()
 {
-  Lock lock(&mutex);
-
-  if (status != DRIVER_RUNNING) {
-    return status;
-  }
+  synchronized (mutex) {
+    if (status != DRIVER_RUNNING) {
+      return status;
+    }
 
-  while (status == DRIVER_RUNNING) {
-    pthread_cond_wait(&cond, &mutex);
-  }
+    while (status == DRIVER_RUNNING) {
+      pthread_cond_wait(&cond, &mutex);
+    }
 
-  CHECK(status == DRIVER_ABORTED || status == DRIVER_STOPPED);
+    CHECK(status == DRIVER_ABORTED || status == DRIVER_STOPPED);
 
-  return status;
+    return status;
+  }
 }
 
 
@@ -1669,17 +1669,17 @@ Status MesosSchedulerDriver::run()
 
 Status MesosSchedulerDriver::killTask(const TaskID& taskId)
 {
-  Lock lock(&mutex);
-
-  if (status != DRIVER_RUNNING) {
-    return status;
-  }
+  synchronized (mutex) {
+    if (status != DRIVER_RUNNING) {
+      return status;
+    }
 
-  CHECK(process != NULL);
+    CHECK(process != NULL);
 
-  dispatch(process, &SchedulerProcess::killTask, taskId);
+    dispatch(process, &SchedulerProcess::killTask, taskId);
 
-  return status;
+    return status;
+  }
 }
 
 
@@ -1700,17 +1700,17 @@ Status MesosSchedulerDriver::launchTasks(
     const vector<TaskInfo>& tasks,
     const Filters& filters)
 {
-  Lock lock(&mutex);
-
-  if (status != DRIVER_RUNNING) {
-    return status;
-  }
+  synchronized (mutex) {
+    if (status != DRIVER_RUNNING) {
+      return status;
+    }
 
-  CHECK(process != NULL);
+    CHECK(process != NULL);
 
-  dispatch(process, &SchedulerProcess::launchTasks, offerIds, tasks, filters);
+    dispatch(process, &SchedulerProcess::launchTasks, offerIds, tasks, filters);
 
-  return status;
+    return status;
+  }
 }
 
 
@@ -1719,22 +1719,22 @@ Status MesosSchedulerDriver::acceptOffers(
     const vector<Offer::Operation>& operations,
     const Filters& filters)
 {
-  Lock lock(&mutex);
-
-  if (status != DRIVER_RUNNING) {
-    return status;
-  }
+  synchronized (mutex) {
+    if (status != DRIVER_RUNNING) {
+      return status;
+    }
 
-  CHECK(process != NULL);
+    CHECK(process != NULL);
 
-  dispatch(
-      process,
-      &SchedulerProcess::acceptOffers,
-      offerIds,
-      operations,
-      filters);
+    dispatch(
+        process,
+        &SchedulerProcess::acceptOffers,
+        offerIds,
+        operations,
+        filters);
 
-  return status;
+    return status;
+  }
 }
 
 
@@ -1751,40 +1751,40 @@ Status MesosSchedulerDriver::declineOffer(
 
 Status MesosSchedulerDriver::reviveOffers()
 {
-  Lock lock(&mutex);
-
-  if (status != DRIVER_RUNNING) {
-    return status;
-  }
+  synchronized (mutex) {
+    if (status != DRIVER_RUNNING) {
+      return status;
+    }
 
-  CHECK(process != NULL);
+    CHECK(process != NULL);
 
-  dispatch(process, &SchedulerProcess::reviveOffers);
+    dispatch(process, &SchedulerProcess::reviveOffers);
 
-  return status;
+    return status;
+  }
 }
 
 
 Status MesosSchedulerDriver::acknowledgeStatusUpdate(
     const TaskStatus& taskStatus)
 {
-  Lock lock(&mutex);
-
-  if (status != DRIVER_RUNNING) {
-    return status;
-  }
+  synchronized (mutex) {
+    if (status != DRIVER_RUNNING) {
+      return status;
+    }
 
-  // TODO(bmahler): Should this use abort() instead?
-  if (implicitAcknowlegements) {
-    ABORT("Cannot call acknowledgeStatusUpdate:"
-          " Implicit acknowledgements are enabled");
-  }
+    // TODO(bmahler): Should this use abort() instead?
+    if (implicitAcknowlegements) {
+      ABORT("Cannot call acknowledgeStatusUpdate:"
+            " Implicit acknowledgements are enabled");
+    }
 
-  CHECK(process != NULL);
+    CHECK(process != NULL);
 
-  dispatch(process, &SchedulerProcess::acknowledgeStatusUpdate, taskStatus);
+    dispatch(process, &SchedulerProcess::acknowledgeStatusUpdate, taskStatus);
 
-  return status;
+    return status;
+  }
 }
 
 
@@ -1793,50 +1793,50 @@ Status MesosSchedulerDriver::sendFrameworkMessage(
     const SlaveID& slaveId,
     const string& data)
 {
-  Lock lock(&mutex);
-
-  if (status != DRIVER_RUNNING) {
-    return status;
-  }
+  synchronized (mutex) {
+    if (status != DRIVER_RUNNING) {
+      return status;
+    }
 
-  CHECK(process != NULL);
+    CHECK(process != NULL);
 
-  dispatch(process, &SchedulerProcess::sendFrameworkMessage,
-           executorId, slaveId, data);
+    dispatch(process, &SchedulerProcess::sendFrameworkMessage,
+            executorId, slaveId, data);
 
-  return status;
+    return status;
+  }
 }
 
 
 Status MesosSchedulerDriver::reconcileTasks(
     const vector<TaskStatus>& statuses)
 {
-  Lock lock(&mutex);
-
-  if (status != DRIVER_RUNNING) {
-    return status;
-  }
+  synchronized (mutex) {
+    if (status != DRIVER_RUNNING) {
+      return status;
+    }
 
-  CHECK(process != NULL);
+    CHECK(process != NULL);
 
-  dispatch(process, &SchedulerProcess::reconcileTasks, statuses);
+    dispatch(process, &SchedulerProcess::reconcileTasks, statuses);
 
-  return status;
+    return status;
+  }
 }
 
 
 Status MesosSchedulerDriver::requestResources(
     const vector<Request>& requests)
 {
-  Lock lock(&mutex);
-
-  if (status != DRIVER_RUNNING) {
-    return status;
-  }
+  synchronized (mutex) {
+    if (status != DRIVER_RUNNING) {
+      return status;
+    }
 
-  CHECK(process != NULL);
+    CHECK(process != NULL);
 
-  dispatch(process, &SchedulerProcess::requestResources, requests);
+    dispatch(process, &SchedulerProcess::requestResources, requests);
 
-  return status;
+    return status;
+  }
 }