You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2015/06/14 11:46:31 UTC
[13/21] mesos git commit: Update Mesos scheduler to use synchronized.
Update Mesos scheduler to use synchronized.
Review: https://reviews.apache.org/r/35096
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b2d80474
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b2d80474
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b2d80474
Branch: refs/heads/master
Commit: b2d8047428228cbbea65f4af889d11e8918e2e96
Parents: 5b0eeb0
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Jun 13 07:06:22 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Jun 14 02:43:01 2015 -0700
----------------------------------------------------------------------
include/mesos/scheduler.hpp | 5 +-
src/sched/sched.cpp | 382 +++++++++++++++++++--------------------
2 files changed, 193 insertions(+), 194 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/b2d80474/include/mesos/scheduler.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/scheduler.hpp b/include/mesos/scheduler.hpp
index 2ee6b5c..0b54ffe 100644
--- a/include/mesos/scheduler.hpp
+++ b/include/mesos/scheduler.hpp
@@ -19,11 +19,10 @@
#ifndef __MESOS_SCHEDULER_HPP__
#define __MESOS_SCHEDULER_HPP__
-#include <functional>
-#include <queue>
-
#include <pthread.h>
+#include <functional>
+#include <queue>
#include <string>
#include <vector>
http://git-wip-us.apache.org/repos/asf/mesos/blob/b2d80474/src/sched/sched.cpp
----------------------------------------------------------------------
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index 9423607..bc76c71 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -71,8 +71,6 @@
#include "authentication/cram_md5/authenticatee.hpp"
-#include "common/lock.hpp"
-
#include "local/flags.hpp"
#include "local/local.hpp"
@@ -841,8 +839,9 @@ protected:
send(master.get(), message);
}
- Lock lock(mutex);
- pthread_cond_signal(cond);
+ synchronized (mutex) {
+ pthread_cond_signal(cond);
+ }
}
// NOTE: This function informs the master to stop attempting to send
@@ -866,8 +865,9 @@ protected:
send(master.get(), message);
}
- Lock lock(mutex);
- pthread_cond_signal(cond);
+ synchronized (mutex) {
+ pthread_cond_signal(cond);
+ }
}
void killTask(const TaskID& taskId)
@@ -1507,156 +1507,156 @@ MesosSchedulerDriver::~MesosSchedulerDriver()
Status MesosSchedulerDriver::start()
{
- Lock lock(&mutex);
-
- if (status != DRIVER_NOT_STARTED) {
- return status;
- }
-
- if (detector == NULL) {
- Try<MasterDetector*> detector_ = MasterDetector::create(url);
-
- if (detector_.isError()) {
- status = DRIVER_ABORTED;
- string message = "Failed to create a master detector for '" +
- master + "': " + detector_.error();
- scheduler->error(this, message);
+ synchronized (mutex) {
+ if (status != DRIVER_NOT_STARTED) {
return status;
}
- // Save the detector so we can delete it later.
- detector = detector_.get();
- }
+ if (detector == NULL) {
+ Try<MasterDetector*> detector_ = MasterDetector::create(url);
- // Load scheduler flags.
- internal::scheduler::Flags flags;
- Try<Nothing> load = flags.load("MESOS_");
+ if (detector_.isError()) {
+ status = DRIVER_ABORTED;
+ string message = "Failed to create a master detector for '" +
+ master + "': " + detector_.error();
+ scheduler->error(this, message);
+ return status;
+ }
- if (load.isError()) {
- status = DRIVER_ABORTED;
- scheduler->error(this, load.error());
- return status;
- }
+ // Save the detector so we can delete it later.
+ detector = detector_.get();
+ }
- // Initialize modules. Note that since other subsystems may depend
- // upon modules, we should initialize modules before anything else.
- if (flags.modules.isSome()) {
- Try<Nothing> result = modules::ModuleManager::load(flags.modules.get());
- if (result.isError()) {
+ // Load scheduler flags.
+ internal::scheduler::Flags flags;
+ Try<Nothing> load = flags.load("MESOS_");
+
+ if (load.isError()) {
status = DRIVER_ABORTED;
- scheduler->error(this, "Error loading modules: " + result.error());
+ scheduler->error(this, load.error());
return status;
}
- }
- CHECK(process == NULL);
+ // Initialize modules. Note that since other subsystems may depend
+ // upon modules, we should initialize modules before anything else.
+ if (flags.modules.isSome()) {
+ Try<Nothing> result = modules::ModuleManager::load(flags.modules.get());
+ if (result.isError()) {
+ status = DRIVER_ABORTED;
+ scheduler->error(this, "Error loading modules: " + result.error());
+ return status;
+ }
+ }
- if (credential == NULL) {
- process = new SchedulerProcess(
- this,
- scheduler,
- framework,
- None(),
- implicitAcknowlegements,
- schedulerId,
- detector,
- flags,
- &mutex,
- &cond);
- } else {
- const Credential& cred = *credential;
- process = new SchedulerProcess(
- this,
- scheduler,
- framework,
- cred,
- implicitAcknowlegements,
- schedulerId,
- detector,
- flags,
- &mutex,
- &cond);
+ CHECK(process == NULL);
+
+ if (credential == NULL) {
+ process = new SchedulerProcess(
+ this,
+ scheduler,
+ framework,
+ None(),
+ implicitAcknowlegements,
+ schedulerId,
+ detector,
+ flags,
+ &mutex,
+ &cond);
+ } else {
+ const Credential& cred = *credential;
+ process = new SchedulerProcess(
+ this,
+ scheduler,
+ framework,
+ cred,
+ implicitAcknowlegements,
+ schedulerId,
+ detector,
+ flags,
+ &mutex,
+ &cond);
+ }
+
+ spawn(process);
+
+ return status = DRIVER_RUNNING;
}
-
- spawn(process);
-
- return status = DRIVER_RUNNING;
}
Status MesosSchedulerDriver::stop(bool failover)
{
- Lock lock(&mutex);
-
- LOG(INFO) << "Asked to stop the driver";
+ synchronized (mutex) {
+ LOG(INFO) << "Asked to stop the driver";
- if (status != DRIVER_RUNNING && status != DRIVER_ABORTED) {
- VLOG(1) << "Ignoring stop because the status of the driver is "
- << Status_Name(status);
- return status;
- }
+ if (status != DRIVER_RUNNING && status != DRIVER_ABORTED) {
+ VLOG(1) << "Ignoring stop because the status of the driver is "
+ << Status_Name(status);
+ return status;
+ }
- // 'process' might be NULL if the driver has failed to instantiate
- // it due to bad parameters (e.g. error in creating the detector
- // or loading flags).
- if (process != NULL) {
- process->running = false;
- dispatch(process, &SchedulerProcess::stop, failover);
- }
+ // 'process' might be NULL if the driver has failed to instantiate
+ // it due to bad parameters (e.g. error in creating the detector
+ // or loading flags).
+ if (process != NULL) {
+ process->running = false;
+ dispatch(process, &SchedulerProcess::stop, failover);
+ }
- // TODO(benh): It might make more sense to clean up our local
- // cluster here than in the destructor. However, what would be even
- // better is to allow multiple local clusters to exist (i.e. not use
- // global vars in local.cpp) so that ours can just be an instance
- // variable in MesosSchedulerDriver.
+ // TODO(benh): It might make more sense to clean up our local
+ // cluster here than in the destructor. However, what would be
+ // even better is to allow multiple local clusters to exist (i.e.
+ // not use global vars in local.cpp) so that ours can just be an
+ // instance variable in MesosSchedulerDriver.
- bool aborted = status == DRIVER_ABORTED;
+ bool aborted = status == DRIVER_ABORTED;
- status = DRIVER_STOPPED;
+ status = DRIVER_STOPPED;
- return aborted ? DRIVER_ABORTED : status;
+ return aborted ? DRIVER_ABORTED : status;
+ }
}
Status MesosSchedulerDriver::abort()
{
- Lock lock(&mutex);
-
- LOG(INFO) << "Asked to abort the driver";
+ synchronized (mutex) {
+ LOG(INFO) << "Asked to abort the driver";
- if (status != DRIVER_RUNNING) {
- VLOG(1) << "Ignoring abort because the status of the driver is "
- << Status_Name(status);
- return status;
- }
+ if (status != DRIVER_RUNNING) {
+ VLOG(1) << "Ignoring abort because the status of the driver is "
+ << Status_Name(status);
+ return status;
+ }
- CHECK_NOTNULL(process);
- process->running = false;
+ CHECK_NOTNULL(process);
+ process->running = false;
- // Dispatching here ensures that we still process the outstanding
- // requests *from* the scheduler, since those do proceed when
- // aborted is true.
- dispatch(process, &SchedulerProcess::abort);
+ // Dispatching here ensures that we still process the outstanding
+ // requests *from* the scheduler, since those do proceed when
+ // aborted is true.
+ dispatch(process, &SchedulerProcess::abort);
- return status = DRIVER_ABORTED;
+ return status = DRIVER_ABORTED;
+ }
}
Status MesosSchedulerDriver::join()
{
- Lock lock(&mutex);
-
- if (status != DRIVER_RUNNING) {
- return status;
- }
+ synchronized (mutex) {
+ if (status != DRIVER_RUNNING) {
+ return status;
+ }
- while (status == DRIVER_RUNNING) {
- pthread_cond_wait(&cond, &mutex);
- }
+ while (status == DRIVER_RUNNING) {
+ pthread_cond_wait(&cond, &mutex);
+ }
- CHECK(status == DRIVER_ABORTED || status == DRIVER_STOPPED);
+ CHECK(status == DRIVER_ABORTED || status == DRIVER_STOPPED);
- return status;
+ return status;
+ }
}
@@ -1669,17 +1669,17 @@ Status MesosSchedulerDriver::run()
Status MesosSchedulerDriver::killTask(const TaskID& taskId)
{
- Lock lock(&mutex);
-
- if (status != DRIVER_RUNNING) {
- return status;
- }
+ synchronized (mutex) {
+ if (status != DRIVER_RUNNING) {
+ return status;
+ }
- CHECK(process != NULL);
+ CHECK(process != NULL);
- dispatch(process, &SchedulerProcess::killTask, taskId);
+ dispatch(process, &SchedulerProcess::killTask, taskId);
- return status;
+ return status;
+ }
}
@@ -1700,17 +1700,17 @@ Status MesosSchedulerDriver::launchTasks(
const vector<TaskInfo>& tasks,
const Filters& filters)
{
- Lock lock(&mutex);
-
- if (status != DRIVER_RUNNING) {
- return status;
- }
+ synchronized (mutex) {
+ if (status != DRIVER_RUNNING) {
+ return status;
+ }
- CHECK(process != NULL);
+ CHECK(process != NULL);
- dispatch(process, &SchedulerProcess::launchTasks, offerIds, tasks, filters);
+ dispatch(process, &SchedulerProcess::launchTasks, offerIds, tasks, filters);
- return status;
+ return status;
+ }
}
@@ -1719,22 +1719,22 @@ Status MesosSchedulerDriver::acceptOffers(
const vector<Offer::Operation>& operations,
const Filters& filters)
{
- Lock lock(&mutex);
-
- if (status != DRIVER_RUNNING) {
- return status;
- }
+ synchronized (mutex) {
+ if (status != DRIVER_RUNNING) {
+ return status;
+ }
- CHECK(process != NULL);
+ CHECK(process != NULL);
- dispatch(
- process,
- &SchedulerProcess::acceptOffers,
- offerIds,
- operations,
- filters);
+ dispatch(
+ process,
+ &SchedulerProcess::acceptOffers,
+ offerIds,
+ operations,
+ filters);
- return status;
+ return status;
+ }
}
@@ -1751,40 +1751,40 @@ Status MesosSchedulerDriver::declineOffer(
Status MesosSchedulerDriver::reviveOffers()
{
- Lock lock(&mutex);
-
- if (status != DRIVER_RUNNING) {
- return status;
- }
+ synchronized (mutex) {
+ if (status != DRIVER_RUNNING) {
+ return status;
+ }
- CHECK(process != NULL);
+ CHECK(process != NULL);
- dispatch(process, &SchedulerProcess::reviveOffers);
+ dispatch(process, &SchedulerProcess::reviveOffers);
- return status;
+ return status;
+ }
}
Status MesosSchedulerDriver::acknowledgeStatusUpdate(
const TaskStatus& taskStatus)
{
- Lock lock(&mutex);
-
- if (status != DRIVER_RUNNING) {
- return status;
- }
+ synchronized (mutex) {
+ if (status != DRIVER_RUNNING) {
+ return status;
+ }
- // TODO(bmahler): Should this use abort() instead?
- if (implicitAcknowlegements) {
- ABORT("Cannot call acknowledgeStatusUpdate:"
- " Implicit acknowledgements are enabled");
- }
+ // TODO(bmahler): Should this use abort() instead?
+ if (implicitAcknowlegements) {
+ ABORT("Cannot call acknowledgeStatusUpdate:"
+ " Implicit acknowledgements are enabled");
+ }
- CHECK(process != NULL);
+ CHECK(process != NULL);
- dispatch(process, &SchedulerProcess::acknowledgeStatusUpdate, taskStatus);
+ dispatch(process, &SchedulerProcess::acknowledgeStatusUpdate, taskStatus);
- return status;
+ return status;
+ }
}
@@ -1793,50 +1793,50 @@ Status MesosSchedulerDriver::sendFrameworkMessage(
const SlaveID& slaveId,
const string& data)
{
- Lock lock(&mutex);
-
- if (status != DRIVER_RUNNING) {
- return status;
- }
+ synchronized (mutex) {
+ if (status != DRIVER_RUNNING) {
+ return status;
+ }
- CHECK(process != NULL);
+ CHECK(process != NULL);
- dispatch(process, &SchedulerProcess::sendFrameworkMessage,
- executorId, slaveId, data);
+ dispatch(process, &SchedulerProcess::sendFrameworkMessage,
+ executorId, slaveId, data);
- return status;
+ return status;
+ }
}
Status MesosSchedulerDriver::reconcileTasks(
const vector<TaskStatus>& statuses)
{
- Lock lock(&mutex);
-
- if (status != DRIVER_RUNNING) {
- return status;
- }
+ synchronized (mutex) {
+ if (status != DRIVER_RUNNING) {
+ return status;
+ }
- CHECK(process != NULL);
+ CHECK(process != NULL);
- dispatch(process, &SchedulerProcess::reconcileTasks, statuses);
+ dispatch(process, &SchedulerProcess::reconcileTasks, statuses);
- return status;
+ return status;
+ }
}
Status MesosSchedulerDriver::requestResources(
const vector<Request>& requests)
{
- Lock lock(&mutex);
-
- if (status != DRIVER_RUNNING) {
- return status;
- }
+ synchronized (mutex) {
+ if (status != DRIVER_RUNNING) {
+ return status;
+ }
- CHECK(process != NULL);
+ CHECK(process != NULL);
- dispatch(process, &SchedulerProcess::requestResources, requests);
+ dispatch(process, &SchedulerProcess::requestResources, requests);
- return status;
+ return status;
+ }
}