You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2012/01/27 02:25:15 UTC
svn commit: r1236485 [2/7] - in /incubator/mesos/trunk: ./ include/mesos/
src/common/ src/exec/ src/local/ src/log/ src/master/ src/python/native/
src/sched/ src/slave/ src/tests/ src/zookeeper/ third_party/libprocess/
third_party/libprocess/include/pr...
Modified: incubator/mesos/trunk/src/master/slaves_manager.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/slaves_manager.cpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/slaves_manager.cpp (original)
+++ incubator/mesos/trunk/src/master/slaves_manager.cpp Fri Jan 27 01:25:13 2012
@@ -42,6 +42,7 @@ using namespace mesos::internal::master;
using boost::bad_lexical_cast;
using boost::lexical_cast;
+using process::Future;
using process::HttpInternalServerErrorResponse;
using process::HttpNotFoundResponse;
using process::HttpOKResponse;
@@ -49,7 +50,6 @@ using process::HttpResponse;
using process::HttpRequest;
using process::PID;
using process::Process;
-using process::Promise;
using process::UPID;
using std::ostringstream;
@@ -71,16 +71,16 @@ public:
virtual ~ZooKeeperSlavesManagerStorage();
- virtual Promise<bool> add(const string& hostname, uint16_t port);
- virtual Promise<bool> remove(const string& hostname, uint16_t port);
- virtual Promise<bool> activate(const string& hostname, uint16_t port);
- virtual Promise<bool> deactivate(const string& hostname, uint16_t port);
-
- Promise<bool> connected();
- Promise<bool> reconnecting();
- Promise<bool> reconnected();
- Promise<bool> expired();
- Promise<bool> updated(const string& path);
+ virtual Future<bool> add(const string& hostname, uint16_t port);
+ virtual Future<bool> remove(const string& hostname, uint16_t port);
+ virtual Future<bool> activate(const string& hostname, uint16_t port);
+ virtual Future<bool> deactivate(const string& hostname, uint16_t port);
+
+ Future<bool> connected();
+ Future<bool> reconnecting();
+ Future<bool> reconnected();
+ Future<bool> expired();
+ Future<bool> updated(const string& path);
private:
bool parse(const string& key,
@@ -159,7 +159,7 @@ ZooKeeperSlavesManagerStorage::~ZooKeepe
}
-Promise<bool> ZooKeeperSlavesManagerStorage::add(const string& hostname, uint16_t port)
+Future<bool> ZooKeeperSlavesManagerStorage::add(const string& hostname, uint16_t port)
{
// TODO(benh): Use ZooKeeperSlavesManagerStorage::parse to clean up code.
int ret;
@@ -214,7 +214,7 @@ Promise<bool> ZooKeeperSlavesManagerStor
}
-Promise<bool> ZooKeeperSlavesManagerStorage::remove(const string& hostname, uint16_t port)
+Future<bool> ZooKeeperSlavesManagerStorage::remove(const string& hostname, uint16_t port)
{
// TODO(benh): Use ZooKeeperSlavesManagerStorage::parse to clean up code.
int ret;
@@ -269,7 +269,7 @@ Promise<bool> ZooKeeperSlavesManagerStor
}
-Promise<bool> ZooKeeperSlavesManagerStorage::activate(const string& hostname, uint16_t port)
+Future<bool> ZooKeeperSlavesManagerStorage::activate(const string& hostname, uint16_t port)
{
// TODO(benh): Use ZooKeeperSlavesManagerStorage::parse to clean up code.
int ret;
@@ -350,7 +350,7 @@ Promise<bool> ZooKeeperSlavesManagerStor
}
-Promise<bool> ZooKeeperSlavesManagerStorage::deactivate(const string& hostname, uint16_t port)
+Future<bool> ZooKeeperSlavesManagerStorage::deactivate(const string& hostname, uint16_t port)
{
// TODO(benh): Use ZooKeeperSlavesManagerStorage::parse to clean up code.
int ret;
@@ -431,7 +431,7 @@ Promise<bool> ZooKeeperSlavesManagerStor
}
-Promise<bool> ZooKeeperSlavesManagerStorage::connected()
+Future<bool> ZooKeeperSlavesManagerStorage::connected()
{
int ret;
@@ -467,7 +467,7 @@ Promise<bool> ZooKeeperSlavesManagerStor
}
-Promise<bool> ZooKeeperSlavesManagerStorage::reconnecting()
+Future<bool> ZooKeeperSlavesManagerStorage::reconnecting()
{
LOG(INFO) << "Slaves manager storage lost connection to ZooKeeper, "
<< "attempting to reconnect ...";
@@ -475,7 +475,7 @@ Promise<bool> ZooKeeperSlavesManagerStor
}
-Promise<bool> ZooKeeperSlavesManagerStorage::reconnected()
+Future<bool> ZooKeeperSlavesManagerStorage::reconnected()
{
LOG(INFO) << "Slaves manager storage has reconnected ...";
@@ -485,7 +485,7 @@ Promise<bool> ZooKeeperSlavesManagerStor
}
-Promise<bool> ZooKeeperSlavesManagerStorage::expired()
+Future<bool> ZooKeeperSlavesManagerStorage::expired()
{
LOG(WARNING) << "Slaves manager storage session expired!";
@@ -501,7 +501,7 @@ Promise<bool> ZooKeeperSlavesManagerStor
}
-Promise<bool> ZooKeeperSlavesManagerStorage::updated(const string& path)
+Future<bool> ZooKeeperSlavesManagerStorage::updated(const string& path)
{
int ret;
string result;
@@ -636,19 +636,19 @@ SlavesManager::SlavesManager(const Confi
process::spawn(storage);
}
- // Set up our HTTP endpoints.
- installHttpHandler("add", &SlavesManager::add);
- installHttpHandler("remove", &SlavesManager::remove);
- installHttpHandler("activate", &SlavesManager::activate);
- installHttpHandler("deactivate", &SlavesManager::deactivate);
- installHttpHandler("activated", &SlavesManager::activated);
- installHttpHandler("deactivated", &SlavesManager::deactivated);
+ // Setup our HTTP endpoints.
+ route("add", &SlavesManager::add);
+ route("remove", &SlavesManager::remove);
+ route("activate", &SlavesManager::activate);
+ route("deactivate", &SlavesManager::deactivate);
+ route("activated", &SlavesManager::activated);
+ route("deactivated", &SlavesManager::deactivated);
}
SlavesManager::~SlavesManager()
{
- process::post(storage->self(), process::TERMINATE);
+ process::terminate(storage->self());
process::wait(storage->self());
delete storage;
}
@@ -668,7 +668,7 @@ bool SlavesManager::add(const string& ho
// Ignore request if slave is already active.
if (active.contains(hostname, port)) {
LOG(WARNING) << "Attempted to add an already added slave!";
- return true;
+ return false;
}
// Make sure this slave is not currently deactivated.
@@ -679,8 +679,13 @@ bool SlavesManager::add(const string& ho
}
// Ask the storage system to persist the addition.
- if (process::call(storage->self(), &SlavesManagerStorage::add,
- hostname, port)) {
+ Future<bool> added =
+ process::dispatch(storage->self(), &SlavesManagerStorage::add,
+ hostname, port);
+
+ added.await();
+
+ if (added.isReady() && added.get()) {
active.put(hostname, port);
// Tell the master that this slave is now active.
@@ -704,8 +709,13 @@ bool SlavesManager::remove(const string&
}
// Get the storage system to persist the removal.
- if (process::call(storage->self(), &SlavesManagerStorage::remove,
- hostname, port)) {
+ Future<bool> removed =
+ process::dispatch(storage->self(), &SlavesManagerStorage::remove,
+ hostname, port);
+
+ removed.await();
+
+ if (removed.isReady() && removed.get()) {
active.remove(hostname, port);
inactive.remove(hostname, port);
@@ -725,8 +735,13 @@ bool SlavesManager::activate(const strin
// Make sure the slave is currently deactivated.
if (inactive.contains(hostname, port)) {
// Get the storage system to persist the activation.
- if (process::call(storage->self(), &SlavesManagerStorage::activate,
- hostname, port)) {
+ Future<bool> activated =
+ process::dispatch(storage->self(), &SlavesManagerStorage::activate,
+ hostname, port);
+
+ activated.await();
+
+ if (activated.isReady() && activated.get()) {
active.put(hostname, port);
inactive.remove(hostname, port);
@@ -747,8 +762,13 @@ bool SlavesManager::deactivate(const str
// Make sure the slave is currently activated.
if (active.contains(hostname, port)) {
// Get the storage system to persist the deactivation.
- if (process::call(storage->self(), &SlavesManagerStorage::deactivate,
- hostname, port)) {
+ Future<bool> deactivated =
+ process::dispatch(storage->self(), &SlavesManagerStorage::deactivate,
+ hostname, port);
+
+ deactivated.await();
+
+ if (deactivated.isReady() && deactivated.get()) {
active.remove(hostname, port);
inactive.put(hostname, port);
@@ -795,7 +815,7 @@ void SlavesManager::updateInactive(
}
-Promise<HttpResponse> SlavesManager::add(const HttpRequest& request)
+Future<HttpResponse> SlavesManager::add(const HttpRequest& request)
{
// Parse the query to get out the slave hostname and port.
string hostname = "";
@@ -838,7 +858,7 @@ Promise<HttpResponse> SlavesManager::add
}
-Promise<HttpResponse> SlavesManager::remove(const HttpRequest& request)
+Future<HttpResponse> SlavesManager::remove(const HttpRequest& request)
{
// Parse the query to get out the slave hostname and port.
string hostname = "";
@@ -881,7 +901,7 @@ Promise<HttpResponse> SlavesManager::rem
}
-Promise<HttpResponse> SlavesManager::activate(const HttpRequest& request)
+Future<HttpResponse> SlavesManager::activate(const HttpRequest& request)
{
// Parse the query to get out the slave hostname and port.
string hostname = "";
@@ -924,7 +944,7 @@ Promise<HttpResponse> SlavesManager::act
}
-Promise<HttpResponse> SlavesManager::deactivate(const HttpRequest& request)
+Future<HttpResponse> SlavesManager::deactivate(const HttpRequest& request)
{
// Parse the query to get out the slave hostname and port.
string hostname = "";
@@ -967,7 +987,7 @@ Promise<HttpResponse> SlavesManager::dea
}
-Promise<HttpResponse> SlavesManager::activated(const HttpRequest& request)
+Future<HttpResponse> SlavesManager::activated(const HttpRequest& request)
{
LOG(INFO) << "Slaves manager received HTTP request for activated slaves";
@@ -985,7 +1005,7 @@ Promise<HttpResponse> SlavesManager::act
}
-Promise<HttpResponse> SlavesManager::deactivated(const HttpRequest& request)
+Future<HttpResponse> SlavesManager::deactivated(const HttpRequest& request)
{
LOG(INFO) << "Slaves manager received HTTP request for deactivated slaves";
Modified: incubator/mesos/trunk/src/master/slaves_manager.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/slaves_manager.hpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/slaves_manager.hpp (original)
+++ incubator/mesos/trunk/src/master/slaves_manager.hpp Fri Jan 27 01:25:13 2012
@@ -36,10 +36,10 @@ class Master;
class SlavesManagerStorage : public process::Process<SlavesManagerStorage>
{
public:
- virtual process::Promise<bool> add(const std::string& hostname, uint16_t port) { return true; }
- virtual process::Promise<bool> remove(const std::string& hostname, uint16_t port) { return true; }
- virtual process::Promise<bool> activate(const std::string& hostname, uint16_t port) { return true; }
- virtual process::Promise<bool> deactivate(const std::string& hostname, uint16_t port) { return true; }
+ virtual process::Future<bool> add(const std::string& hostname, uint16_t port) { return true; }
+ virtual process::Future<bool> remove(const std::string& hostname, uint16_t port) { return true; }
+ virtual process::Future<bool> activate(const std::string& hostname, uint16_t port) { return true; }
+ virtual process::Future<bool> deactivate(const std::string& hostname, uint16_t port) { return true; }
};
@@ -61,12 +61,12 @@ public:
void updateInactive(const multihashmap<std::string, uint16_t>& updated);
private:
- process::Promise<process::HttpResponse> add(const process::HttpRequest& request);
- process::Promise<process::HttpResponse> remove(const process::HttpRequest& request);
- process::Promise<process::HttpResponse> activate(const process::HttpRequest& request);
- process::Promise<process::HttpResponse> deactivate(const process::HttpRequest& request);
- process::Promise<process::HttpResponse> activated(const process::HttpRequest& request);
- process::Promise<process::HttpResponse> deactivated(const process::HttpRequest& request);
+ process::Future<process::HttpResponse> add(const process::HttpRequest& request);
+ process::Future<process::HttpResponse> remove(const process::HttpRequest& request);
+ process::Future<process::HttpResponse> activate(const process::HttpRequest& request);
+ process::Future<process::HttpResponse> deactivate(const process::HttpRequest& request);
+ process::Future<process::HttpResponse> activated(const process::HttpRequest& request);
+ process::Future<process::HttpResponse> deactivated(const process::HttpRequest& request);
const process::PID<Master> master;
Modified: incubator/mesos/trunk/src/python/native/mesos_executor_driver_impl.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/python/native/mesos_executor_driver_impl.cpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/python/native/mesos_executor_driver_impl.cpp (original)
+++ incubator/mesos/trunk/src/python/native/mesos_executor_driver_impl.cpp Fri Jan 27 01:25:13 2012
@@ -175,13 +175,23 @@ void MesosExecutorDriverImpl_dealloc(Mes
{
if (self->driver != NULL) {
self->driver->stop();
+ // We need to wrap the driver destructor in an "allow threads"
+ // macro since the MesosExecutorDriver destructor waits for the
+ // ExecutorProcess to terminate and there might be a thread that
+ // is trying to acquire the GIL to call through the
+ // ProxyExecutor. It will only be after this thread executes that
+ // the ExecutorProcess might actually get a terminate.
+ Py_BEGIN_ALLOW_THREADS
delete self->driver;
+ Py_END_ALLOW_THREADS
self->driver = NULL;
}
+
if (self->proxyExecutor != NULL) {
delete self->proxyExecutor;
self->proxyExecutor = NULL;
}
+
MesosExecutorDriverImpl_clear(self);
self->ob_type->tp_free((PyObject*) self);
}
Modified: incubator/mesos/trunk/src/python/native/mesos_scheduler_driver_impl.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/python/native/mesos_scheduler_driver_impl.cpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/python/native/mesos_scheduler_driver_impl.cpp (original)
+++ incubator/mesos/trunk/src/python/native/mesos_scheduler_driver_impl.cpp Fri Jan 27 01:25:13 2012
@@ -215,7 +215,15 @@ void MesosSchedulerDriverImpl_dealloc(Me
{
if (self->driver != NULL) {
self->driver->stop();
+ // We need to wrap the driver destructor in an "allow threads"
+ // macro since the MesosSchedulerDriver destructor waits for the
+ // SchedulerProcess to terminate and there might be a thread that
+ // is trying to acquire the GIL to call through the
+ // ProxyScheduler. It will only be after this thread executes that
+ // the SchedulerProcess might actually get a terminate.
+ Py_BEGIN_ALLOW_THREADS
delete self->driver;
+ Py_END_ALLOW_THREADS
self->driver = NULL;
}
@@ -273,9 +281,9 @@ PyObject* MesosSchedulerDriverImpl_stop(
return NULL;
}
- bool failover = false;
+ bool failover = false; // Should match default in mesos.py.
- if (!PyArg_ParseTuple(args, "b", &failover)) {
+ if (!PyArg_ParseTuple(args, "|b", &failover)) {
return NULL;
}
Modified: incubator/mesos/trunk/src/python/native/proxy_scheduler.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/python/native/proxy_scheduler.cpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/python/native/proxy_scheduler.cpp (original)
+++ incubator/mesos/trunk/src/python/native/proxy_scheduler.cpp Fri Jan 27 01:25:13 2012
@@ -91,6 +91,7 @@ void ProxyScheduler::resourceOffers(Sche
(char*) "OO",
impl,
list);
+
if (res == NULL) {
cerr << "Failed to call scheduler's resourceOffer" << endl;
goto cleanup;
Modified: incubator/mesos/trunk/src/sched/sched.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/sched/sched.cpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/sched/sched.cpp (original)
+++ incubator/mesos/trunk/src/sched/sched.cpp Fri Jan 27 01:25:13 2012
@@ -30,8 +30,6 @@
#include <string>
#include <sstream>
-#include <tr1/functional>
-
#include <mesos/scheduler.hpp>
#include <process/dispatch.hpp>
@@ -65,10 +63,9 @@ using std::vector;
using process::wait; // Necessary on some OS's to disambiguate.
-using std::tr1::bind;
-
-namespace mesos { namespace internal {
+namespace mesos {
+namespace internal {
// The scheduler process (below) is responsible for interacting with
// the master and responding to Mesos API calls from scheduler
@@ -80,59 +77,63 @@ class SchedulerProcess : public Protobuf
{
public:
SchedulerProcess(MesosSchedulerDriver* _driver,
- Scheduler* _sched,
+ Scheduler* _scheduler,
const FrameworkID& _frameworkId,
- const FrameworkInfo& _framework)
+ const FrameworkInfo& _framework,
+ pthread_mutex_t* _mutex,
+ pthread_cond_t* _cond)
: driver(_driver),
- sched(_sched),
+ scheduler(_scheduler),
frameworkId(_frameworkId),
framework(_framework),
+ mutex(_mutex),
+ cond(_cond),
master(UPID()),
failover(!(_frameworkId == "")),
connected(false),
aborted(false)
{
- installProtobufHandler<NewMasterDetectedMessage>(
+ install<NewMasterDetectedMessage>(
&SchedulerProcess::newMasterDetected,
&NewMasterDetectedMessage::pid);
- installProtobufHandler<NoMasterDetectedMessage>(
+ install<NoMasterDetectedMessage>(
&SchedulerProcess::noMasterDetected);
- installProtobufHandler<FrameworkRegisteredMessage>(
+ install<FrameworkRegisteredMessage>(
&SchedulerProcess::registered,
&FrameworkRegisteredMessage::framework_id);
- installProtobufHandler<FrameworkReregisteredMessage>(
+ install<FrameworkReregisteredMessage>(
&SchedulerProcess::reregistered,
&FrameworkReregisteredMessage::framework_id);
- installProtobufHandler<ResourceOffersMessage>(
+ install<ResourceOffersMessage>(
&SchedulerProcess::resourceOffers,
&ResourceOffersMessage::offers,
&ResourceOffersMessage::pids);
- installProtobufHandler<RescindResourceOfferMessage>(
+ install<RescindResourceOfferMessage>(
&SchedulerProcess::rescindOffer,
&RescindResourceOfferMessage::offer_id);
- installProtobufHandler<StatusUpdateMessage>(
+ install<StatusUpdateMessage>(
&SchedulerProcess::statusUpdate,
&StatusUpdateMessage::update,
&StatusUpdateMessage::pid);
- installProtobufHandler<LostSlaveMessage>(
+ install<LostSlaveMessage>(
&SchedulerProcess::lostSlave,
&LostSlaveMessage::slave_id);
- installProtobufHandler<ExecutorToFrameworkMessage>(
+ install<ExecutorToFrameworkMessage>(
&SchedulerProcess::frameworkMessage,
&ExecutorToFrameworkMessage::slave_id,
&ExecutorToFrameworkMessage::framework_id,
&ExecutorToFrameworkMessage::executor_id,
&ExecutorToFrameworkMessage::data);
- installProtobufHandler<FrameworkErrorMessage>(
+ install<FrameworkErrorMessage>(
&SchedulerProcess::error,
&FrameworkErrorMessage::code,
&FrameworkErrorMessage::message);
@@ -176,7 +177,7 @@ protected:
connected = true;
failover = false;
- invoke(bind(&Scheduler::registered, sched, driver, frameworkId));
+ scheduler->registered(driver, frameworkId);
}
void reregistered(const FrameworkID& frameworkId)
@@ -237,7 +238,7 @@ protected:
}
}
- invoke(bind(&Scheduler::resourceOffers, sched, driver, offers));
+ scheduler->resourceOffers(driver, offers);
}
void rescindOffer(const OfferID& offerId)
@@ -251,7 +252,8 @@ protected:
VLOG(1) << "Rescinded offer " << offerId;
savedOffers.erase(offerId);
- invoke(bind(&Scheduler::offerRescinded, sched, driver, offerId));
+
+ scheduler->offerRescinded(driver, offerId);
}
void statusUpdate(const StatusUpdate& update, const UPID& pid)
@@ -279,7 +281,7 @@ protected:
// multiple times (of course, if a scheduler re-uses a TaskID,
// that could be bad.
- invoke(bind(&Scheduler::statusUpdate, sched, driver, status));
+ scheduler->statusUpdate(driver, status);
// Send a status update acknowledgement ONLY if not aborted!
if (!aborted && pid) {
@@ -306,7 +308,8 @@ protected:
VLOG(1) << "Lost slave " << slaveId;
savedSlavePids.erase(slaveId);
- invoke(bind(&Scheduler::slaveLost, sched, driver, slaveId));
+
+ scheduler->slaveLost(driver, slaveId);
}
void frameworkMessage(const SlaveID& slaveId,
@@ -321,8 +324,7 @@ protected:
VLOG(1) << "Received framework message";
- invoke(bind(&Scheduler::frameworkMessage,
- sched, driver, slaveId, executorId, data));
+ scheduler->frameworkMessage(driver, slaveId, executorId, data);
}
void error(int32_t code, const string& message)
@@ -336,7 +338,7 @@ protected:
driver->abort();
- invoke(bind(&Scheduler::error, sched, driver, code, message));
+ scheduler->error(driver, code, message);
}
void stop(bool failover)
@@ -352,14 +354,21 @@ protected:
message.mutable_framework_id()->MergeFrom(frameworkId);
send(master, message);
}
+
+ Lock lock(mutex);
+ pthread_cond_signal(cond);
}
- // NOTE: This function stops any further callbacks from reaching the
- // scheduler by informing the master. The abort flag stops
- // those callbacks that are already enqueued.
+ // NOTE: This function informs the master to stop attempting to send
+ // messages to this scheduler. The abort flag stops any already
+ // enqueued messages or messages in flight from being handled. We
+ // don't want to terminate the process because one might do a
+ // MesosSchedulerDriver::stop later, which dispatches to
+ // SchedulerProcess::stop.
void abort()
{
VLOG(1) << "Aborting the framework";
+
aborted = true;
if (!connected) {
@@ -373,6 +382,8 @@ protected:
message.mutable_framework_id()->MergeFrom(frameworkId);
send(master, message);
+ Lock lock(mutex);
+ pthread_cond_signal(cond);
}
void killTask(const TaskID& taskId)
@@ -424,7 +435,7 @@ protected:
status->mutable_task_id()->MergeFrom(task.task_id());
status->set_state(TASK_LOST);
status->set_message("Master Disconnected");
- update.set_timestamp(elapsedTime());
+ update.set_timestamp(Clock::now());
update.set_uuid(UUID::random().toBytes());
statusUpdate(update, UPID());
@@ -517,9 +528,11 @@ private:
friend class mesos::MesosSchedulerDriver;
MesosSchedulerDriver* driver;
- Scheduler* sched;
+ Scheduler* scheduler;
FrameworkID frameworkId;
FrameworkInfo framework;
+ pthread_mutex_t* mutex;
+ pthread_cond_t* cond;
bool failover;
UPID master;
@@ -530,7 +543,8 @@ private:
hashmap<SlaveID, UPID> savedSlavePids;
};
-}} // namespace mesos { namespace internal {
+} // namespace internal {
+} // namespace mesos {
// Implementation of C++ API.
@@ -546,7 +560,7 @@ private:
// (2) There is a variable called state, that represents the current
// state of the driver and is used to enforce its state transitions.
-MesosSchedulerDriver::MesosSchedulerDriver(Scheduler* sched,
+MesosSchedulerDriver::MesosSchedulerDriver(Scheduler* scheduler,
const std::string& frameworkName,
const ExecutorInfo& executorInfo,
const string& url,
@@ -563,15 +577,15 @@ MesosSchedulerDriver::MesosSchedulerDriv
} catch (ConfigurationException& e) {
// TODO(benh|matei): Are error callbacks not fatal!?
string message = string("Configuration error: ") + e.what();
- sched->error(this, 2, message);
+ scheduler->error(this, 2, message);
conf = new Configuration();
}
conf->set("url", url); // Override URL param with the one from the user
- init(sched, conf, frameworkId, frameworkName, executorInfo);
+ init(scheduler, conf, frameworkId, frameworkName, executorInfo);
}
-MesosSchedulerDriver::MesosSchedulerDriver(Scheduler* sched,
+MesosSchedulerDriver::MesosSchedulerDriver(Scheduler* scheduler,
const std::string& frameworkName,
const ExecutorInfo& executorInfo,
const map<string, string> ¶ms,
@@ -588,14 +602,14 @@ MesosSchedulerDriver::MesosSchedulerDriv
} catch (ConfigurationException& e) {
// TODO(benh|matei): Are error callbacks not fatal?
string message = string("Configuration error: ") + e.what();
- sched->error(this, 2, message);
+ scheduler->error(this, 2, message);
conf = new Configuration();
}
- init(sched, conf, frameworkId, frameworkName, executorInfo);
+ init(scheduler, conf, frameworkId, frameworkName, executorInfo);
}
-MesosSchedulerDriver::MesosSchedulerDriver(Scheduler* sched,
+MesosSchedulerDriver::MesosSchedulerDriver(Scheduler* scheduler,
const std::string& frameworkName,
const ExecutorInfo& executorInfo,
int argc,
@@ -612,14 +626,14 @@ MesosSchedulerDriver::MesosSchedulerDriv
conf = new Configuration(configurator.load(argc, argv, false));
} catch (ConfigurationException& e) {
string message = string("Configuration error: ") + e.what();
- sched->error(this, 2, message);
+ scheduler->error(this, 2, message);
conf = new Configuration();
}
- init(sched, conf, frameworkId, frameworkName, executorInfo);
+ init(scheduler, conf, frameworkId, frameworkName, executorInfo);
}
-void MesosSchedulerDriver::init(Scheduler* _sched,
+void MesosSchedulerDriver::init(Scheduler* _scheduler,
Configuration* _conf,
const FrameworkID& _frameworkId,
const std::string& _frameworkName,
@@ -627,7 +641,7 @@ void MesosSchedulerDriver::init(Schedule
{
GOOGLE_PROTOBUF_VERIFY_VERSION;
- sched = _sched;
+ scheduler = _scheduler;
conf = _conf;
frameworkId = _frameworkId;
frameworkName = _frameworkName;
@@ -667,7 +681,9 @@ MesosSchedulerDriver::~MesosSchedulerDri
// that we could add a method to libprocess that told us whether or
// not this was about to be deadlock, and possibly report this back
// to the user somehow. Note that we will also wait forever if
- // MesosSchedulerDriver::stop was never called.
+ // MesosSchedulerDriver::stop was never called. It might make sense
+ // to try and add some more debug output for the case where we wait
+ // indefinitely due to deadlock ...
if (process != NULL) {
wait(process);
delete process;
@@ -714,7 +730,10 @@ Status MesosSchedulerDriver::start()
CHECK(process == NULL);
- process = new SchedulerProcess(this, sched, frameworkId, framework);
+ // TODO(benh): Consider using a libprocess Latch rather than a
+ // pthread mutex and condition variable for signaling.
+ process = new SchedulerProcess(this, scheduler, frameworkId,
+ framework, &mutex, &cond);
UPID pid = spawn(process);
@@ -755,8 +774,6 @@ Status MesosSchedulerDriver::stop(bool f
// local clusters to exist (i.e. not use global vars in local.cpp) so that
// ours can just be an instance variable in MesosSchedulerDriver.
- pthread_cond_signal(&cond);
-
return OK;
}
@@ -779,8 +796,6 @@ Status MesosSchedulerDriver::abort()
state = ABORTED;
- pthread_cond_signal(&cond);
-
return OK;
}
@@ -801,11 +816,30 @@ Status MesosSchedulerDriver::join()
pthread_cond_wait(&cond, &mutex);
}
- if (state == ABORTED)
+ if (state == ABORTED) {
return DRIVER_ABORTED;
+ }
CHECK(state == STOPPED);
+// // If the driver has been stopped, then we wait for SchedulerProcess
+// // to terminate. This is necessary to support languages like Python
+// // that use reference counting on their objects. Without doing this
+// // the Python interpreter might attempt to delete the
+// // MesosSchedulerDriver instance from within the context of the
+// // SchedulerProcess (because a thread is trying to call into a
+// // scheduler but there are no more references to
+// // MesosSchedulerDriver), which means we would deadlock and wait
+// // forever since the MesosSchedulerDriver destructor waits for the
+// // process to terminate forever. Note that we don't need to wait if
+// // the driver has been aborted because we won't actually call back
+// // in to the driver in that case, and thus we won't have
+
+// lock.unlock(); // Let a thread in SchedulerProcess use the driver.
+
+// CHECK(process != NULL);
+// wait(process);
+
return OK;
}
@@ -895,12 +929,6 @@ Status MesosSchedulerDriver::sendFramewo
}
-void MesosSchedulerDriver::error(int code, const string& message)
-{
- sched->error(this, code, message);
-}
-
-
Status MesosSchedulerDriver::requestResources(
const vector<ResourceRequest>& requests)
{
Modified: incubator/mesos/trunk/src/slave/http.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/http.cpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/http.cpp (original)
+++ incubator/mesos/trunk/src/slave/http.cpp Fri Jan 27 01:25:13 2012
@@ -30,9 +30,9 @@
#include "slave/http.hpp"
#include "slave/slave.hpp"
+using process::Future;
using process::HttpResponse;
using process::HttpRequest;
-using process::Promise;
using std::string;
@@ -113,7 +113,7 @@ JSON::Object model(const Framework& fram
namespace http {
-Promise<HttpResponse> vars(
+Future<HttpResponse> vars(
const Slave& slave,
const HttpRequest& request)
{
@@ -146,7 +146,7 @@ Promise<HttpResponse> vars(
namespace json {
-Promise<HttpResponse> stats(
+Future<HttpResponse> stats(
const Slave& slave,
const HttpRequest& request)
{
@@ -175,7 +175,7 @@ Promise<HttpResponse> stats(
}
-Promise<HttpResponse> state(
+Future<HttpResponse> state(
const Slave& slave,
const HttpRequest& request)
{
Modified: incubator/mesos/trunk/src/slave/http.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/http.hpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/http.hpp (original)
+++ incubator/mesos/trunk/src/slave/http.hpp Fri Jan 27 01:25:13 2012
@@ -34,7 +34,7 @@ namespace http {
// Returns current vars in "key value\n" format (keys do not contain
// spaces, values may contain spaces but are ended by a newline).
-process::Promise<process::HttpResponse> vars(
+process::Future<process::HttpResponse> vars(
const Slave& slave,
const process::HttpRequest& request);
@@ -42,13 +42,13 @@ process::Promise<process::HttpResponse>
namespace json {
// Returns current statistics of the slave.
-process::Promise<process::HttpResponse> stats(
+process::Future<process::HttpResponse> stats(
const Slave& slave,
const process::HttpRequest& request);
// Returns current state of the cluster that the slave knows about.
-process::Promise<process::HttpResponse> state(
+process::Future<process::HttpResponse> state(
const Slave& slave,
const process::HttpRequest& request);
Modified: incubator/mesos/trunk/src/slave/reaper.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/reaper.cpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/reaper.cpp (original)
+++ incubator/mesos/trunk/src/slave/reaper.cpp Fri Jan 27 01:25:13 2012
@@ -20,6 +20,7 @@
#include <sys/wait.h>
#include <process/dispatch.hpp>
+#include <process/timer.hpp>
#include "reaper.hpp"
@@ -28,7 +29,9 @@
using namespace process;
-namespace mesos { namespace internal { namespace slave {
+namespace mesos {
+namespace internal {
+namespace slave {
Reaper::Reaper() {}
@@ -43,24 +46,28 @@ void Reaper::addProcessExitedListener(
}
-void Reaper::operator () ()
+void Reaper::initialize()
{
- while (true) {
- serve(1);
- if (name() == TIMEOUT) {
- // Check whether any child process has exited.
- pid_t pid;
- int status;
- if ((pid = waitpid((pid_t) -1, &status, WNOHANG)) > 0) {
- foreach (const PID<ProcessExitedListener>& listener, listeners) {
- dispatch(listener, &ProcessExitedListener::processExited,
- pid, status);
- }
- }
- } else if (name() == TERMINATE) {
- return;
+ delay(1.0, self(), &Reaper::reap);
+}
+
+
+void Reaper::reap()
+{
+ // Check whether any child process has exited.
+ pid_t pid;
+ int status;
+ if ((pid = waitpid((pid_t) -1, &status, WNOHANG)) > 0) {
+ foreach (const PID<ProcessExitedListener>& listener, listeners) {
+ dispatch(listener, &ProcessExitedListener::processExited, pid, status);
}
}
+
+ delay(1.0, self(), &Reaper::reap); // Reap forever!
}
-}}} // namespace mesos { namespace internal { namespace slave {
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
+
+
Modified: incubator/mesos/trunk/src/slave/reaper.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/reaper.hpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/reaper.hpp (original)
+++ incubator/mesos/trunk/src/slave/reaper.hpp Fri Jan 27 01:25:13 2012
@@ -24,7 +24,9 @@
#include <process/process.hpp>
-namespace mesos { namespace internal { namespace slave {
+namespace mesos {
+namespace internal {
+namespace slave {
class ProcessExitedListener : public process::Process<ProcessExitedListener>
{
@@ -42,13 +44,17 @@ public:
void addProcessExitedListener(const process::PID<ProcessExitedListener>&);
protected:
- virtual void operator () ();
+ virtual void initialize();
+
+ void reap();
private:
std::set<process::PID<ProcessExitedListener> > listeners;
};
-}}} // namespace mesos { namespace internal { namespace slave {
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
#endif // __REAPER_HPP__
Modified: incubator/mesos/trunk/src/slave/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.cpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave/slave.cpp Fri Jan 27 01:25:13 2012
@@ -76,9 +76,7 @@ Slave::Slave(const Resources& _resources
resources(_resources),
local(_local),
isolationModule(_isolationModule)
-{
- initialize();
-}
+{}
Slave::Slave(const Configuration& _conf,
@@ -94,8 +92,6 @@ Slave::Slave(const Configuration& _conf,
attributes =
Attributes::parse(conf.get<string>("attributes", ""));
-
- initialize();
}
@@ -152,6 +148,43 @@ void Slave::registerOptions(Configurator
void Slave::initialize()
{
+ LOG(INFO) << "Slave started at " << self();
+ LOG(INFO) << "Slave resources: " << resources;
+
+ Result<string> result = utils::os::hostname();
+
+ if (result.isError()) {
+ LOG(FATAL) << "Failed to get hostname: " << result.error();
+ }
+
+ CHECK(result.isSome());
+
+ string hostname = result.get();
+
+ // Check and see if we have a different public DNS name. Normally
+ // this is our hostname, but on EC2 we look for the MESOS_PUBLIC_DNS
+ // environment variable. This allows the master to display our
+ // public name in its webui.
+ string webui_hostname = hostname;
+ if (getenv("MESOS_PUBLIC_DNS") != NULL) {
+ webui_hostname = getenv("MESOS_PUBLIC_DNS");
+ }
+
+ // Initialize slave info.
+ info.set_hostname(hostname);
+ info.set_webui_hostname(webui_hostname);
+ info.set_webui_port(conf.get<int>("webui_port", 8081));
+ info.mutable_resources()->MergeFrom(resources);
+ info.mutable_attributes()->MergeFrom(attributes);
+
+ // Spawn and initialize the isolation module.
+ // TODO(benh): Seems like the isolation module should really be
+ // spawned before being passed to the slave.
+ spawn(isolationModule);
+ dispatch(isolationModule,
+ &IsolationModule::initialize,
+ conf, local, self());
+
// Start all the statistics at 0.
CHECK(TASK_STARTING == TaskState_MIN);
CHECK(TASK_LOST == TaskState_MAX);
@@ -166,151 +199,104 @@ void Slave::initialize()
stats.validFrameworkMessages = 0;
stats.invalidFrameworkMessages = 0;
- startTime = elapsedTime();
+ startTime = Clock::now();
+
connected = false;
// Install protobuf handlers.
- installProtobufHandler<NewMasterDetectedMessage>(
+ install<NewMasterDetectedMessage>(
&Slave::newMasterDetected,
&NewMasterDetectedMessage::pid);
- installProtobufHandler<NoMasterDetectedMessage>(
+ install<NoMasterDetectedMessage>(
&Slave::noMasterDetected);
- installProtobufHandler<SlaveRegisteredMessage>(
+ install<SlaveRegisteredMessage>(
&Slave::registered,
&SlaveRegisteredMessage::slave_id);
- installProtobufHandler<SlaveReregisteredMessage>(
+ install<SlaveReregisteredMessage>(
&Slave::reregistered,
&SlaveReregisteredMessage::slave_id);
- installProtobufHandler<RunTaskMessage>(
+ install<RunTaskMessage>(
&Slave::runTask,
&RunTaskMessage::framework,
&RunTaskMessage::framework_id,
&RunTaskMessage::pid,
&RunTaskMessage::task);
- installProtobufHandler<KillTaskMessage>(
+ install<KillTaskMessage>(
&Slave::killTask,
&KillTaskMessage::framework_id,
&KillTaskMessage::task_id);
- installProtobufHandler<ShutdownFrameworkMessage>(
+ install<ShutdownFrameworkMessage>(
&Slave::shutdownFramework,
&ShutdownFrameworkMessage::framework_id);
- installProtobufHandler<FrameworkToExecutorMessage>(
+ install<FrameworkToExecutorMessage>(
&Slave::schedulerMessage,
&FrameworkToExecutorMessage::slave_id,
&FrameworkToExecutorMessage::framework_id,
&FrameworkToExecutorMessage::executor_id,
&FrameworkToExecutorMessage::data);
- installProtobufHandler<UpdateFrameworkMessage>(
+ install<UpdateFrameworkMessage>(
&Slave::updateFramework,
&UpdateFrameworkMessage::framework_id,
&UpdateFrameworkMessage::pid);
- installProtobufHandler<StatusUpdateAcknowledgementMessage>(
+ install<StatusUpdateAcknowledgementMessage>(
&Slave::statusUpdateAcknowledgement,
&StatusUpdateAcknowledgementMessage::slave_id,
&StatusUpdateAcknowledgementMessage::framework_id,
&StatusUpdateAcknowledgementMessage::task_id,
&StatusUpdateAcknowledgementMessage::uuid);
- installProtobufHandler<RegisterExecutorMessage>(
+ install<RegisterExecutorMessage>(
&Slave::registerExecutor,
&RegisterExecutorMessage::framework_id,
&RegisterExecutorMessage::executor_id);
- installProtobufHandler<StatusUpdateMessage>(
+ install<StatusUpdateMessage>(
&Slave::statusUpdate,
&StatusUpdateMessage::update);
- installProtobufHandler<ExecutorToFrameworkMessage>(
+ install<ExecutorToFrameworkMessage>(
&Slave::executorMessage,
&ExecutorToFrameworkMessage::slave_id,
&ExecutorToFrameworkMessage::framework_id,
&ExecutorToFrameworkMessage::executor_id,
&ExecutorToFrameworkMessage::data);
- // Install some message handlers.
- installMessageHandler(process::EXITED, &Slave::exited);
- installMessageHandler("PING", &Slave::ping);
-
- // Install some HTTP handlers.
- installHttpHandler(
- "vars",
- bind(&http::vars, cref(*this), params::_1));
-
- installHttpHandler(
- "stats.json",
- bind(&http::json::stats, cref(*this), params::_1));
-
- installHttpHandler(
- "state.json",
- bind(&http::json::state, cref(*this), params::_1));
+ install<ShutdownMessage>(
+ &Slave::shutdown);
+
+ // Install the ping message handler.
+ install("PING", &Slave::ping);
+
+ // Setup some HTTP routes.
+ route("vars", bind(&http::vars, cref(*this), params::_1));
+ route("stats.json", bind(&http::json::stats, cref(*this), params::_1));
+ route("state.json", bind(&http::json::state, cref(*this), params::_1));
}
-void Slave::operator () ()
-{
- LOG(INFO) << "Slave started at " << self();
- LOG(INFO) << "Slave resources: " << resources;
-
- Result<string> result = utils::os::hostname();
-
- if (result.isError()) {
- LOG(FATAL) << "Failed to get hostname: " << result.error();
- }
-
- CHECK(result.isSome());
-
- string hostname = result.get();
-
- // Check and see if we have a different public DNS name. Normally
- // this is our hostname, but on EC2 we look for the MESOS_PUBLIC_DNS
- // environment variable. This allows the master to display our
- // public name in its web UI.
- string webui_hostname = hostname;
- if (getenv("MESOS_PUBLIC_DNS") != NULL) {
- webui_hostname = getenv("MESOS_PUBLIC_DNS");
- }
-
- // Initialize slave info.
- info.set_hostname(hostname);
- info.set_webui_hostname(webui_hostname);
- info.set_webui_port(conf.get<int>("webui_port", 8081));
- info.mutable_resources()->MergeFrom(resources);
- info.mutable_attributes()->MergeFrom(attributes);
-
- // Spawn and initialize the isolation module.
- // TODO(benh): Seems like the isolation module should really be
- // spawned before being passed to the slave.
- spawn(isolationModule);
- dispatch(isolationModule,
- &IsolationModule::initialize,
- conf, local, self());
-
- while (true) {
- serve(1);
- if (name() == TERMINATE) {
- LOG(INFO) << "Asked to terminate by " << from();
- foreachkey (const FrameworkID& frameworkId, frameworks) {
- // TODO(benh): Because a shut down isn't instantaneous (but has
- // a shut down/kill phases) we might not actually propogate all
- // the status updates appropriately here. Consider providing
- // an alternative function which skips the shut down phase and
- // simply does a kill (sending all status updates
- // immediately). Of course, this still isn't sufficient
- // because those status updates might get lost and we won't
- // resend them unless we build that into the system.
- shutdownFramework(frameworkId);
- }
- break;
- }
+void Slave::finalize()
+{
+ LOG(INFO) << "Slave terminating";
+
+ foreachkey (const FrameworkID& frameworkId, frameworks) {
+ // TODO(benh): Because a shut down isn't instantaneous (but has
+ // a shut down/kill phases) we might not actually propogate all
+ // the status updates appropriately here. Consider providing
+ // an alternative function which skips the shut down phase and
+ // simply does a kill (sending all status updates
+ // immediately). Of course, this still isn't sufficient
+ // because those status updates might get lost and we won't
+ // resend them unless we build that into the system.
+ shutdownFramework(frameworkId);
}
// Stop the isolation module.
@@ -319,6 +305,13 @@ void Slave::operator () ()
}
+void Slave::shutdown()
+{
+ LOG(INFO) << "Slave asked to shut down";
+ terminate(self());
+}
+
+
void Slave::newMasterDetected(const UPID& pid)
{
LOG(INFO) << "New master detected at " << pid;
@@ -432,7 +425,7 @@ void Slave::runTask(const FrameworkInfo&
TaskStatus* status = update->mutable_status();
status->mutable_task_id()->MergeFrom(task.task_id());
status->set_state(TASK_LOST);
- update->set_timestamp(elapsedTime());
+ update->set_timestamp(Clock::now());
update->set_uuid(UUID::random().toBytes());
send(master, message);
} else if (!executor->pid) {
@@ -505,7 +498,7 @@ void Slave::killTask(const FrameworkID&
TaskStatus* status = update->mutable_status();
status->mutable_task_id()->MergeFrom(taskId);
status->set_state(TASK_LOST);
- update->set_timestamp(elapsedTime());
+ update->set_timestamp(Clock::now());
update->set_uuid(UUID::random().toBytes());
send(master, message);
@@ -528,7 +521,7 @@ void Slave::killTask(const FrameworkID&
TaskStatus* status = update->mutable_status();
status->mutable_task_id()->MergeFrom(taskId);
status->set_state(TASK_LOST);
- update->set_timestamp(elapsedTime());
+ update->set_timestamp(Clock::now());
update->set_uuid(UUID::random().toBytes());
send(master, message);
} else if (!executor->pid) {
@@ -548,7 +541,7 @@ void Slave::killTask(const FrameworkID&
TaskStatus* status = update->mutable_status();
status->mutable_task_id()->MergeFrom(taskId);
status->set_state(TASK_KILLED);
- update->set_timestamp(elapsedTime());
+ update->set_timestamp(Clock::now());
update->set_uuid(UUID::random().toBytes());
send(master, message);
} else {
@@ -717,7 +710,7 @@ void Slave::statusUpdateAcknowledgement(
// message.set_reliable(true);
// send(master, message);
-// stream->timeout = elapsedTime() + STATUS_UPDATE_RETRY_INTERVAL;
+// stream->timeout = Clock::now() + STATUS_UPDATE_RETRY_INTERVAL;
// }
// }
// }
@@ -735,7 +728,7 @@ void Slave::registerExecutor(const Frame
LOG(WARNING) << "Framework " << frameworkId
<< " does not exist (it may have been killed),"
<< " telling executor to exit";
- send(from(), ShutdownExecutorMessage());
+ reply(ShutdownExecutorMessage());
return;
}
@@ -745,15 +738,15 @@ void Slave::registerExecutor(const Frame
if (executor == NULL) {
LOG(WARNING) << "WARNING! Unexpected executor '" << executorId
<< "' registering for framework " << frameworkId;
- send(from(), ShutdownExecutorMessage());
+ reply(ShutdownExecutorMessage());
} else if (executor->pid) {
LOG(WARNING) << "WARNING! executor '" << executorId
<< "' of framework " << frameworkId
<< " is already running";
- send(from(), ShutdownExecutorMessage());
+ reply(ShutdownExecutorMessage());
} else {
// Save the pid for the executor.
- executor->pid = from();
+ executor->pid = from;
// First account for the tasks we're about to start.
foreachvalue (const TaskDescription& task, executor->queuedTasks) {
@@ -911,7 +904,7 @@ void Slave::registerExecutor(const Frame
// message.set_reliable(true);
// send(master, message);
-// stream->timeout = elapsedTime() + STATUS_UPDATE_RETRY_INTERVAL;
+// stream->timeout = Clock::now() + STATUS_UPDATE_RETRY_INTERVAL;
// }
// stats.tasks[status.state()]++;
@@ -1003,9 +996,9 @@ void Slave::executorMessage(const SlaveI
}
-void Slave::ping()
+void Slave::ping(const UPID& from, const string& body)
{
- send(from(), "PONG");
+ send(from, "PONG");
}
@@ -1035,7 +1028,7 @@ void Slave::statusUpdateTimeout(
// void Slave::timeout()
// {
// // Check and see if we should re-send any status updates.
-// double now = elapsedTime();
+// double now = Clock::now();
// foreachvalue (StatusUpdateStream* stream, statusUpdateStreams) {
// CHECK(stream->timeout > 0);
@@ -1058,11 +1051,11 @@ void Slave::statusUpdateTimeout(
// }
-void Slave::exited()
+void Slave::exited(const UPID& pid)
{
- LOG(INFO) << "Process exited: " << from();
+ LOG(INFO) << "Process exited: " << from;
- if (from() == master) {
+ if (master == pid) {
LOG(WARNING) << "WARNING! Master disconnected!"
<< " Waiting for a new master to be elected.";
// TODO(benh): After so long waiting for a master, commit suicide.
@@ -1155,7 +1148,7 @@ Framework* Slave::getFramework(const Fra
// message.set_reliable(true);
// send(master, message);
-// stream->timeout = elapsedTime() + STATUS_UPDATE_RETRY_INTERVAL;
+// stream->timeout = Clock::now() + STATUS_UPDATE_RETRY_INTERVAL;
// }
// return stream;
@@ -1417,27 +1410,21 @@ string Slave::createUniqueWorkDirectory(
// framework on this slave).
out << "/runs/";
- string dir;
- dir = out.str();
+ const string& prefix = out.str();
for (int i = 0; i < INT_MAX; i++) {
out << i;
- DIR* d = opendir(out.str().c_str());
- if (d == NULL && errno == ENOENT) {
- break;
+ VLOG(1) << "Checking if " << out.str() << " already exists";
+ if (!utils::os::exists(out.str())) {
+ bool created = utils::os::mkdir(out.str());
+ CHECK(created) << "Error creating work directory: " << out.str();
+ return out.str();
+ } else {
+ out.str(prefix); // Try with prefix again.
}
- closedir(d);
- out.str(dir);
}
-
- dir = out.str();
-
- bool created = utils::os::mkdir(out.str());
-
- CHECK(created) << "Error creating work directory: " << dir;
-
- return dir;
}
-
-}}} // namespace mesos { namespace internal { namespace slave {
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
Modified: incubator/mesos/trunk/src/slave/slave.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.hpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.hpp (original)
+++ incubator/mesos/trunk/src/slave/slave.hpp Fri Jan 27 01:25:13 2012
@@ -61,6 +61,8 @@ public:
static void registerOptions(Configurator* configurator);
+ void shutdown();
+
void newMasterDetected(const UPID& pid);
void noMasterDetected();
void masterDetectionFailure();
@@ -91,8 +93,7 @@ public:
const FrameworkID& frameworkId,
const ExecutorID& executorId,
const std::string& data);
- void ping();
- void exited();
+ void ping(const UPID& from, const std::string& body);
void statusUpdateTimeout(const FrameworkID& frameworkId, const UUID& uuid);
@@ -105,9 +106,9 @@ public:
int status);
protected:
- virtual void operator () ();
-
- void initialize();
+ virtual void initialize();
+ virtual void finalize();
+ virtual void exited(const UPID& pid);
// Helper routine to lookup a framework.
Framework* getFramework(const FrameworkID& frameworkId);
@@ -141,15 +142,15 @@ private:
// Http handlers, friends of the slave in order to access state,
// they get invoked from within the slave so there is no need to
// use synchronization mechanisms to protect state.
- friend Promise<HttpResponse> http::vars(
+ friend Future<HttpResponse> http::vars(
const Slave& slave,
const HttpRequest& request);
- friend Promise<HttpResponse> http::json::stats(
+ friend Future<HttpResponse> http::json::stats(
const Slave& slave,
const HttpRequest& request);
- friend Promise<HttpResponse> http::json::state(
+ friend Future<HttpResponse> http::json::state(
const Slave& slave,
const HttpRequest& request);
Modified: incubator/mesos/trunk/src/tests/exception_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/exception_tests.cpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/exception_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/exception_tests.cpp Fri Jan 27 01:25:13 2012
@@ -57,7 +57,6 @@ using testing::Eq;
using testing::Not;
using testing::Return;
using testing::SaveArg;
-using testing::SaveArgPointee;
TEST(ExceptionTest, AbortOnFrameworkError)
@@ -109,7 +108,7 @@ TEST(ExceptionTest, DeactiveFrameworkOnA
MockFilter filter;
process::filter(&filter);
- EXPECT_MSG(filter, _, _, _)
+ EXPECT_MESSAGE(filter, _, _, _)
.WillRepeatedly(Return(false));
PID<Master> master = local::launch(1, 2, 1 * Gigabyte, false);
@@ -131,7 +130,7 @@ TEST(ExceptionTest, DeactiveFrameworkOnA
trigger deactivateMsg;
- EXPECT_MSG(filter, Eq(DeactivateFrameworkMessage().GetTypeName()), _, _)
+ EXPECT_MESSAGE(filter, Eq(DeactivateFrameworkMessage().GetTypeName()), _, _)
.WillOnce(DoAll(Trigger(&deactivateMsg), Return(false)));
driver.start();
@@ -197,7 +196,7 @@ TEST(ExceptionTest, DisallowSchedulerCal
MockFilter filter;
process::filter(&filter);
- EXPECT_MSG(filter, _, _, _)
+ EXPECT_MESSAGE(filter, _, _, _)
.WillRepeatedly(Return(false));
PID<Master> master = local::launch(1, 2, 1 * Gigabyte, false);
@@ -236,13 +235,14 @@ TEST(ExceptionTest, DisallowSchedulerCal
process::Message message;
trigger rescindMsg, unregisterMsg;
- EXPECT_MSG(filter, Eq(FrameworkRegisteredMessage().GetTypeName()), _, _)
- .WillOnce(DoAll(SaveArgPointee<0>(&message),Return(false)));
+ EXPECT_MESSAGE(filter, Eq(FrameworkRegisteredMessage().GetTypeName()), _, _)
+ .WillOnce(DoAll(SaveArgField<0>(&process::MessageEvent::message, &message),
+ Return(false)));
- EXPECT_MSG(filter, Eq(RescindResourceOfferMessage().GetTypeName()), _, _)
+ EXPECT_MESSAGE(filter, Eq(RescindResourceOfferMessage().GetTypeName()), _, _)
.WillOnce(Trigger(&rescindMsg));
- EXPECT_MSG(filter, Eq(UnregisterFrameworkMessage().GetTypeName()), _, _)
+ EXPECT_MESSAGE(filter, Eq(UnregisterFrameworkMessage().GetTypeName()), _, _)
.WillOnce(Trigger(&unregisterMsg));
driver.start();
Modified: incubator/mesos/trunk/src/tests/fault_tolerance_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/fault_tolerance_tests.cpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/fault_tolerance_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/fault_tolerance_tests.cpp Fri Jan 27 01:25:13 2012
@@ -44,6 +44,7 @@ using mesos::internal::slave::ProcessBas
using mesos::internal::slave::Slave;
using mesos::internal::slave::STATUS_UPDATE_RETRY_INTERVAL_SECONDS;
+using process::Clock;
using process::PID;
using std::string;
@@ -59,7 +60,6 @@ using testing::Eq;
using testing::Not;
using testing::Return;
using testing::SaveArg;
-using testing::SaveArgPointee;
TEST(FaultToleranceTest, SlaveLost)
@@ -108,7 +108,7 @@ TEST(FaultToleranceTest, SlaveLost)
EXPECT_CALL(sched, slaveLost(&driver, offers[0].slave_id()))
.WillOnce(Trigger(&slaveLostCall));
- process::post(slave, process::TERMINATE);
+ process::terminate(slave);
WAIT_UNTIL(offerRescindedCall);
WAIT_UNTIL(slaveLostCall);
@@ -118,7 +118,7 @@ TEST(FaultToleranceTest, SlaveLost)
process::wait(slave);
- process::post(master, process::TERMINATE);
+ process::terminate(master);
process::wait(master);
}
@@ -127,12 +127,12 @@ TEST(FaultToleranceTest, SlavePartitione
{
ASSERT_TRUE(GTEST_IS_THREADSAFE);
- process::Clock::pause();
+ Clock::pause();
MockFilter filter;
process::filter(&filter);
- EXPECT_MSG(filter, _, _, _)
+ EXPECT_MESSAGE(filter, _, _, _)
.WillRepeatedly(Return(false));
PID<Master> master = local::launch(1, 2, 1 * Gigabyte, false);
@@ -140,7 +140,7 @@ TEST(FaultToleranceTest, SlavePartitione
MockScheduler sched;
MesosSchedulerDriver driver(&sched, "", DEFAULT_EXECUTOR_INFO, master);
- trigger slaveLostCall;
+ trigger slaveLostCall, pingMsg;
EXPECT_CALL(sched, registered(&driver, _))
.Times(1);
@@ -154,14 +154,20 @@ TEST(FaultToleranceTest, SlavePartitione
EXPECT_CALL(sched, slaveLost(&driver, _))
.WillOnce(Trigger(&slaveLostCall));
- EXPECT_MSG(filter, Eq("PONG"), _, _)
+ EXPECT_MESSAGE(filter, Eq("PING"), _, _)
+ .WillRepeatedly(DoAll(Trigger(&pingMsg),
+ Return(false)));
+
+ EXPECT_MESSAGE(filter, Eq("PONG"), _, _)
.WillRepeatedly(Return(true));
driver.start();
+ WAIT_UNTIL(pingMsg);
+
double secs = master::SLAVE_PONG_TIMEOUT * master::MAX_SLAVE_TIMEOUTS;
- process::Clock::advance(secs);
+ Clock::advance(secs);
WAIT_UNTIL(slaveLostCall);
@@ -172,7 +178,7 @@ TEST(FaultToleranceTest, SlavePartitione
process::filter(NULL);
- process::Clock::resume();
+ Clock::resume();
}
@@ -249,7 +255,7 @@ TEST(FaultToleranceTest, FrameworkReliab
MockFilter filter;
process::filter(&filter);
- EXPECT_MSG(filter, _, _, _)
+ EXPECT_MESSAGE(filter, _, _, _)
.WillRepeatedly(Return(false));
PID<Master> master = local::launch(1, 2, 1 * Gigabyte, false);
@@ -270,7 +276,7 @@ TEST(FaultToleranceTest, FrameworkReliab
// Drop the registered message, in the first attempt, but allow subsequent
// tries.
- EXPECT_MSG(filter, Eq(FrameworkRegisteredMessage().GetTypeName()), _, _)
+ EXPECT_MESSAGE(filter, Eq(FrameworkRegisteredMessage().GetTypeName()), _, _)
.WillOnce(Return(true))
.WillRepeatedly(Return(false));
@@ -294,7 +300,7 @@ TEST(FaultToleranceTest, FrameworkReregi
MockFilter filter;
process::filter(&filter);
- EXPECT_MSG(filter, _, _, _)
+ EXPECT_MESSAGE(filter, _, _, _)
.WillRepeatedly(Return(false));
PID<Master> master = local::launch(1, 2, 1 * Gigabyte, false);
@@ -316,11 +322,11 @@ TEST(FaultToleranceTest, FrameworkReregi
process::Message message;
trigger schedReregisteredMsg;
- EXPECT_MSG(filter, Eq(FrameworkRegisteredMessage().GetTypeName()), _, _)
- .WillOnce(DoAll(SaveArgPointee<0>(&message),
+ EXPECT_MESSAGE(filter, Eq(FrameworkRegisteredMessage().GetTypeName()), _, _)
+ .WillOnce(DoAll(SaveArgField<0>(&process::MessageEvent::message, &message),
Return(false)));
- EXPECT_MSG(filter, Eq(FrameworkReregisteredMessage().GetTypeName()), _, _)
+ EXPECT_MESSAGE(filter, Eq(FrameworkReregisteredMessage().GetTypeName()), _, _)
.WillOnce(DoAll(Trigger(&schedReregisteredMsg),
Return(false)));
@@ -358,7 +364,7 @@ TEST(FaultToleranceTest, DISABLED_TaskLo
MockFilter filter;
process::filter(&filter);
- EXPECT_MSG(filter, _, _, _)
+ EXPECT_MESSAGE(filter, _, _, _)
.WillRepeatedly(Return(false));
SimpleAllocator a;
@@ -410,8 +416,8 @@ TEST(FaultToleranceTest, DISABLED_TaskLo
process::Message message;
- EXPECT_MSG(filter, Eq(FrameworkRegisteredMessage().GetTypeName()), _, _)
- .WillOnce(DoAll(SaveArgPointee<0>(&message),
+ EXPECT_MESSAGE(filter, Eq(FrameworkRegisteredMessage().GetTypeName()), _, _)
+ .WillOnce(DoAll(SaveArgField<0>(&process::MessageEvent::message, &message),
Return(false)));
driver.start();
@@ -442,10 +448,10 @@ TEST(FaultToleranceTest, DISABLED_TaskLo
driver.stop();
driver.join();
- process::post(slave, process::TERMINATE);
+ process::terminate(slave);
process::wait(slave);
- process::post(master, process::TERMINATE);
+ process::terminate(master);
process::wait(master);
process::filter(NULL);
@@ -456,12 +462,12 @@ TEST(FaultToleranceTest, SchedulerFailov
{
ASSERT_TRUE(GTEST_IS_THREADSAFE);
- process::Clock::pause();
+ Clock::pause();
MockFilter filter;
process::filter(&filter);
- EXPECT_MSG(filter, _, _, _)
+ EXPECT_MESSAGE(filter, _, _, _)
.WillRepeatedly(Return(false));
SimpleAllocator a;
@@ -516,7 +522,7 @@ TEST(FaultToleranceTest, SchedulerFailov
EXPECT_CALL(sched1, error(&driver1, _, "Framework failover"))
.Times(1);
- EXPECT_MSG(filter, Eq(StatusUpdateMessage().GetTypeName()), _,
+ EXPECT_MESSAGE(filter, Eq(StatusUpdateMessage().GetTypeName()), _,
Not(AnyOf(Eq(master), Eq(slave))))
.WillOnce(DoAll(Trigger(&statusUpdateMsg), Return(true)))
.RetiresOnSaturation();
@@ -561,7 +567,7 @@ TEST(FaultToleranceTest, SchedulerFailov
WAIT_UNTIL(registeredCall);
- process::Clock::advance(STATUS_UPDATE_RETRY_INTERVAL_SECONDS);
+ Clock::advance(STATUS_UPDATE_RETRY_INTERVAL_SECONDS);
WAIT_UNTIL(statusUpdateCall);
@@ -571,15 +577,15 @@ TEST(FaultToleranceTest, SchedulerFailov
driver1.join();
driver2.join();
- process::post(slave, process::TERMINATE);
+ process::terminate(slave);
process::wait(slave);
- process::post(master, process::TERMINATE);
+ process::terminate(master);
process::wait(master);
process::filter(NULL);
- process::Clock::resume();
+ Clock::resume();
}
@@ -686,10 +692,10 @@ TEST(FaultToleranceTest, SchedulerFailov
driver1.join();
driver2.join();
- process::post(slave, process::TERMINATE);
+ process::terminate(slave);
process::wait(slave);
- process::post(master, process::TERMINATE);
+ process::terminate(master);
process::wait(master);
}
@@ -701,7 +707,7 @@ TEST(FaultToleranceTest, SlaveReliableRe
MockFilter filter;
process::filter(&filter);
- EXPECT_MSG(filter, _, _, _)
+ EXPECT_MESSAGE(filter, _, _, _)
.WillRepeatedly(Return(false));
SimpleAllocator a;
@@ -732,7 +738,7 @@ TEST(FaultToleranceTest, SlaveReliableRe
trigger slaveRegisterMsg;
// Drop the first registered message, but allow subsequent messages.
- EXPECT_MSG(filter, Eq(SlaveRegisteredMessage().GetTypeName()), _, _)
+ EXPECT_MESSAGE(filter, Eq(SlaveRegisteredMessage().GetTypeName()), _, _)
.WillOnce(Return(true))
.WillRepeatedly(DoAll(Trigger(&slaveRegisterMsg), Return(false)));
@@ -743,10 +749,10 @@ TEST(FaultToleranceTest, SlaveReliableRe
driver.stop();
driver.join();
- process::post(slave, process::TERMINATE);
+ process::terminate(slave);
process::wait(slave);
- process::post(master, process::TERMINATE);
+ process::terminate(master);
process::wait(master);
process::filter(NULL);
@@ -760,7 +766,7 @@ TEST(FaultToleranceTest, SlaveReregister
MockFilter filter;
process::filter(&filter);
- EXPECT_MSG(filter, _, _, _)
+ EXPECT_MESSAGE(filter, _, _, _)
.WillRepeatedly(Return(false));
SimpleAllocator a;
@@ -790,7 +796,7 @@ TEST(FaultToleranceTest, SlaveReregister
trigger slaveReRegisterMsg;
- EXPECT_MSG(filter, Eq(SlaveReregisteredMessage().GetTypeName()), _, _)
+ EXPECT_MESSAGE(filter, Eq(SlaveReregisteredMessage().GetTypeName()), _, _)
.WillOnce(DoAll(Trigger(&slaveReRegisterMsg), Return(false)));
driver.start();
@@ -810,10 +816,10 @@ TEST(FaultToleranceTest, SlaveReregister
driver.stop();
driver.join();
- process::post(slave, process::TERMINATE);
+ process::terminate(slave);
process::wait(slave);
- process::post(master, process::TERMINATE);
+ process::terminate(master);
process::wait(master);
process::filter(NULL);
Modified: incubator/mesos/trunk/src/tests/log_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/log_tests.cpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/log_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/log_tests.cpp Fri Jan 27 01:25:13 2012
@@ -679,10 +679,10 @@ TEST(CoordinatorTest, NotLearnedFill)
MockFilter filter;
process::filter(&filter);
- EXPECT_MSG(filter, _, _, _)
+ EXPECT_MESSAGE(filter, _, _, _)
.WillRepeatedly(Return(false));
- EXPECT_MSG(filter, Eq(LearnedMessage().GetTypeName()), _, _)
+ EXPECT_MESSAGE(filter, Eq(LearnedMessage().GetTypeName()), _, _)
.WillRepeatedly(Return(true));
const std::string path1 = utils::os::getcwd() + "/.log1";
@@ -807,10 +807,10 @@ TEST(CoordinatorTest, MultipleAppendsNot
MockFilter filter;
process::filter(&filter);
- EXPECT_MSG(filter, _, _, _)
+ EXPECT_MESSAGE(filter, _, _, _)
.WillRepeatedly(Return(false));
- EXPECT_MSG(filter, Eq(LearnedMessage().GetTypeName()), _, _)
+ EXPECT_MESSAGE(filter, Eq(LearnedMessage().GetTypeName()), _, _)
.WillRepeatedly(Return(true));
const std::string path1 = utils::os::getcwd() + "/.log1";
@@ -947,10 +947,10 @@ TEST(CoordinatorTest, TruncateNotLearned
MockFilter filter;
process::filter(&filter);
- EXPECT_MSG(filter, _, _, _)
+ EXPECT_MESSAGE(filter, _, _, _)
.WillRepeatedly(Return(false));
- EXPECT_MSG(filter, Eq(LearnedMessage().GetTypeName()), _, _)
+ EXPECT_MESSAGE(filter, Eq(LearnedMessage().GetTypeName()), _, _)
.WillRepeatedly(Return(true));
const std::string path1 = utils::os::getcwd() + "/.log1";
Modified: incubator/mesos/trunk/src/tests/master_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/master_tests.cpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/master_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/master_tests.cpp Fri Jan 27 01:25:13 2012
@@ -48,9 +48,9 @@ using mesos::internal::master::SimpleAll
using mesos::internal::slave::Slave;
-using process::PID;
+using process::Clock;
using process::Future;
-using process::Promise;
+using process::PID;
using std::string;
using std::map;
@@ -74,6 +74,8 @@ TEST(MasterTest, TaskRunning)
MockExecutor exec;
+ trigger shutdownCall;
+
EXPECT_CALL(exec, init(_, _))
.Times(1);
@@ -81,7 +83,7 @@ TEST(MasterTest, TaskRunning)
.WillOnce(SendStatusUpdate(TASK_RUNNING));
EXPECT_CALL(exec, shutdown(_))
- .Times(AtMost(1));
+ .WillOnce(Trigger(&shutdownCall));
map<ExecutorID, Executor*> execs;
execs[DEFAULT_EXECUTOR_ID] = &exec;
@@ -144,10 +146,12 @@ TEST(MasterTest, TaskRunning)
driver.stop();
driver.join();
- process::post(slave, process::TERMINATE);
+ WAIT_UNTIL(shutdownCall); // To ensure can deallocate MockExecutor.
+
+ process::terminate(slave);
process::wait(slave);
- process::post(master, process::TERMINATE);
+ process::terminate(master);
process::wait(master);
}
@@ -162,7 +166,7 @@ TEST(MasterTest, KillTask)
MockExecutor exec;
- trigger killTaskCall;
+ trigger killTaskCall, shutdownCall;
EXPECT_CALL(exec, init(_, _))
.Times(1);
@@ -174,7 +178,7 @@ TEST(MasterTest, KillTask)
.WillOnce(Trigger(&killTaskCall));
EXPECT_CALL(exec, shutdown(_))
- .Times(AtMost(1));
+ .WillOnce(Trigger(&shutdownCall));
map<ExecutorID, Executor*> execs;
execs[DEFAULT_EXECUTOR_ID] = &exec;
@@ -238,10 +242,12 @@ TEST(MasterTest, KillTask)
driver.stop();
driver.join();
- process::post(slave, process::TERMINATE);
+ WAIT_UNTIL(shutdownCall); // To ensure can deallocate MockExecutor.
+
+ process::terminate(slave);
process::wait(slave);
- process::post(master, process::TERMINATE);
+ process::terminate(master);
process::wait(master);
}
@@ -260,7 +266,7 @@ TEST(MasterTest, FrameworkMessage)
ExecutorArgs args;
string execData;
- trigger execFrameworkMessageCall;
+ trigger execFrameworkMessageCall, shutdownCall;
EXPECT_CALL(exec, init(_, _))
.WillOnce(DoAll(SaveArg<0>(&execDriver), SaveArg<1>(&args)));
@@ -273,7 +279,7 @@ TEST(MasterTest, FrameworkMessage)
Trigger(&execFrameworkMessageCall)));
EXPECT_CALL(exec, shutdown(_))
- .Times(AtMost(1));
+ .WillOnce(Trigger(&shutdownCall));
map<ExecutorID, Executor*> execs;
execs[DEFAULT_EXECUTOR_ID] = &exec;
@@ -356,10 +362,12 @@ TEST(MasterTest, FrameworkMessage)
schedDriver.stop();
schedDriver.join();
- process::post(slave, process::TERMINATE);
+ WAIT_UNTIL(shutdownCall); // To ensure can deallocate MockExecutor.
+
+ process::terminate(slave);
process::wait(slave);
- process::post(master, process::TERMINATE);
+ process::terminate(master);
process::wait(master);
}
@@ -374,7 +382,7 @@ TEST(MasterTest, MultipleExecutors)
MockExecutor exec1;
TaskDescription exec1Task;
- trigger exec1LaunchTaskCall;
+ trigger exec1LaunchTaskCall, exec1ShutdownCall;
EXPECT_CALL(exec1, init(_, _))
.Times(1);
@@ -385,11 +393,11 @@ TEST(MasterTest, MultipleExecutors)
SendStatusUpdate(TASK_RUNNING)));
EXPECT_CALL(exec1, shutdown(_))
- .Times(AtMost(1));
+ .WillOnce(Trigger(&exec1ShutdownCall));
MockExecutor exec2;
TaskDescription exec2Task;
- trigger exec2LaunchTaskCall;
+ trigger exec2LaunchTaskCall, exec2ShutdownCall;
EXPECT_CALL(exec2, init(_, _))
.Times(1);
@@ -400,7 +408,7 @@ TEST(MasterTest, MultipleExecutors)
SendStatusUpdate(TASK_RUNNING)));
EXPECT_CALL(exec2, shutdown(_))
- .Times(AtMost(1));
+ .WillOnce(Trigger(&exec2ShutdownCall));
ExecutorID executorId1;
executorId1.set_value("executor-1");
@@ -488,10 +496,13 @@ TEST(MasterTest, MultipleExecutors)
driver.stop();
driver.join();
- process::post(slave, process::TERMINATE);
+ WAIT_UNTIL(exec1ShutdownCall); // To ensure can deallocate MockExecutor.
+ WAIT_UNTIL(exec2ShutdownCall); // To ensure can deallocate MockExecutor.
+
+ process::terminate(slave);
process::wait(slave);
- process::post(master, process::TERMINATE);
+ process::terminate(master);
process::wait(master);
}
@@ -504,10 +515,10 @@ public:
// We need this typedef because MOCK_METHOD is a macro.
typedef map<FrameworkID, FrameworkInfo> Map_FrameworkId_FrameworkInfo;
- MOCK_METHOD0(list, Promise<Result<Map_FrameworkId_FrameworkInfo> >());
- MOCK_METHOD2(add, Promise<Result<bool> >(const FrameworkID&,
- const FrameworkInfo&));
- MOCK_METHOD1(remove, Promise<Result<bool> >(const FrameworkID&));
+ MOCK_METHOD0(list, Future<Result<Map_FrameworkId_FrameworkInfo> >());
+ MOCK_METHOD2(add, Future<Result<bool> >(const FrameworkID&,
+ const FrameworkInfo&));
+ MOCK_METHOD1(remove, Future<Result<bool> >(const FrameworkID&));
};
@@ -601,15 +612,17 @@ TEST_F(FrameworksManagerTestFixture, Add
TEST_F(FrameworksManagerTestFixture, RemoveFramework)
{
+ Clock::pause();
+
// Remove a non-existent framework.
FrameworkID id;
id.set_value("non-existent framework");
- Future<Result<bool> > future =
- process::dispatch(manager, &FrameworksManager::remove, id, 0);
+ Future<Result<bool> > future1 =
+ process::dispatch(manager, &FrameworksManager::remove, id, seconds(0));
- ASSERT_TRUE(future.await(2.0));
- EXPECT_TRUE(future.get().isError());
+ ASSERT_TRUE(future1.await(2.0));
+ EXPECT_TRUE(future1.get().isError());
// Remove an existing framework.
@@ -630,7 +643,9 @@ TEST_F(FrameworksManagerTestFixture, Rem
// Now remove the added framework.
Future<Result<bool> > future3 =
- process::dispatch(manager, &FrameworksManager::remove, id2, 1.0);
+ process::dispatch(manager, &FrameworksManager::remove, id2, seconds(1.0));
+
+ Clock::update(Clock::now(manager) + 1.0);
ASSERT_TRUE(future3.await(2.0));
EXPECT_TRUE(future2.get().get());
@@ -641,6 +656,8 @@ TEST_F(FrameworksManagerTestFixture, Rem
ASSERT_TRUE(future4.await(2.0));
EXPECT_FALSE(future4.get().get());
+
+ Clock::resume();
}
@@ -650,11 +667,11 @@ TEST_F(FrameworksManagerTestFixture, Res
FrameworkID id;
id.set_value("non-existent framework");
- Future<Result<bool> > future =
+ Future<Result<bool> > future1 =
process::dispatch(manager, &FrameworksManager::resurrect, id);
- ASSERT_TRUE(future.await(2.0));
- EXPECT_FALSE(future.get().get());
+ ASSERT_TRUE(future1.await(2.0));
+ EXPECT_FALSE(future1.get().get());
// Resurrect an existent framework that is NOT being removed.
// Add a dummy framework.
@@ -680,9 +697,8 @@ TEST_F(FrameworksManagerTestFixture, Res
}
-// NOTE: In the following tests, with paused clocks, future.await() may wait
-// forever. This makes debugging failed tests hard.
-// TODO(vinod) Need better constructs.
+// TODO(vinod): Using a paused clock in the tests means that
+// future.await() may wait forever. This makes debugging hard.
TEST_F(FrameworksManagerTestFixture, ResurrectExpiringFramework)
{
// This is the crucial test.
@@ -698,28 +714,27 @@ TEST_F(FrameworksManagerTestFixture, Res
info.set_user("test user");
// Add the framework.
- Future<Result<bool> > future =
- process::dispatch(manager, &FrameworksManager::add, id, info);
+ process::dispatch(manager, &FrameworksManager::add, id, info);
- process::Clock::pause();
+ Clock::pause();
// Remove after 2 secs.
- Future<Result<bool> > future2 =
- process::dispatch(manager, &FrameworksManager::remove, id, 2.0);
+ Future<Result<bool> > future1 =
+ process::dispatch(manager, &FrameworksManager::remove, id, seconds(2.0));
// Resurrect in the meanwhile.
- Future<Result<bool> > future3 =
+ Future<Result<bool> > future2 =
process::dispatch(manager, &FrameworksManager::resurrect, id);
- ASSERT_TRUE(future3.await());
- EXPECT_TRUE(future3.get().get());
+ ASSERT_TRUE(future2.await(2.0));
+ EXPECT_TRUE(future2.get().get());
- process::Clock::advance(2.0);
+ Clock::update(Clock::now(manager) + 2.0);
- ASSERT_TRUE(future2.await());
- EXPECT_FALSE(future2.get().get());
+ ASSERT_TRUE(future1.await(2.0));
+ EXPECT_FALSE(future1.get().get());
- process::Clock::resume();
+ Clock::resume();
}
@@ -738,36 +753,35 @@ TEST_F(FrameworksManagerTestFixture, Res
info.set_user("test user");
// Add the framework.
- Future<Result<bool> > future =
- process::dispatch(manager, &FrameworksManager::add, id, info);
+ process::dispatch(manager, &FrameworksManager::add, id, info);
- process::Clock::pause();
+ Clock::pause();
- Future<Result<bool> > future2 =
- process::dispatch(manager, &FrameworksManager::remove, id, 2.0);
+ Future<Result<bool> > future1 =
+ process::dispatch(manager, &FrameworksManager::remove, id, seconds(2.0));
// Resurrect in the meanwhile.
- Future<Result<bool> > future3 =
+ Future<Result<bool> > future2 =
process::dispatch(manager, &FrameworksManager::resurrect, id);
// Remove again.
- Future<Result<bool> > future4 =
- process::dispatch(manager, &FrameworksManager::remove, id, 1.0);
+ Future<Result<bool> > future3 =
+ process::dispatch(manager, &FrameworksManager::remove, id, seconds(1.0));
- ASSERT_TRUE(future3.await());
- EXPECT_TRUE(future3.get().get());
+ ASSERT_TRUE(future2.await(2.0));
+ EXPECT_TRUE(future2.get().get());
- process::Clock::advance(1.0);
+ Clock::update(Clock::now(manager) + 1.0);
- ASSERT_TRUE(future4.await());
- EXPECT_TRUE(future4.get().get());
+ ASSERT_TRUE(future3.await(2.0));
+ EXPECT_TRUE(future3.get().get());
- process::Clock::advance(1.0);
+ Clock::update(Clock::now(manager) + 2.0);
- ASSERT_TRUE(future2.await());
- EXPECT_FALSE(future2.get().get());
+ ASSERT_TRUE(future1.await(2.0));
+ EXPECT_FALSE(future1.get().get());
- process::Clock::resume();
+ Clock::resume();
}
@@ -797,12 +811,12 @@ TEST(FrameworksManagerTest, CacheFailure
process::spawn(manager);
// Test if initially FrameworksManager returns error.
- Future<Result<map<FrameworkID, FrameworkInfo> > > future =
+ Future<Result<map<FrameworkID, FrameworkInfo> > > future1 =
process::dispatch(manager, &FrameworksManager::list);
- ASSERT_TRUE(future.await(2.0));
- ASSERT_TRUE(future.get().isError());
- EXPECT_EQ(future.get().error(), "Error caching framework infos.");
+ ASSERT_TRUE(future1.await(2.0));
+ ASSERT_TRUE(future1.get().isError());
+ EXPECT_EQ(future1.get().error(), "Error caching framework infos.");
// Add framework should function normally despite caching failure.
FrameworkID id;
@@ -821,7 +835,7 @@ TEST(FrameworksManagerTest, CacheFailure
// Remove framework should fail due to caching failure.
Future<Result<bool> > future3 =
- process::dispatch(manager, &FrameworksManager::remove, id, 0);
+ process::dispatch(manager, &FrameworksManager::remove, id, seconds(0));
ASSERT_TRUE(future3.await(2.0));
ASSERT_TRUE(future3.get().isError());
Modified: incubator/mesos/trunk/src/tests/utils.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/utils.hpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/utils.hpp (original)
+++ incubator/mesos/trunk/src/tests/utils.hpp Fri Jan 27 01:25:13 2012
@@ -40,7 +40,9 @@
#include "slave/slave.hpp"
-namespace mesos { namespace internal { namespace test {
+namespace mesos {
+namespace internal {
+namespace test {
/**
* The location of the Mesos source directory. Used by tests to locate
@@ -161,7 +163,22 @@ public:
class MockFilter : public process::Filter
{
public:
- MOCK_METHOD1(filter, bool(process::Message *));
+ MockFilter()
+ {
+ EXPECT_CALL(*this, filter(testing::A<const process::MessageEvent&>()))
+ .WillRepeatedly(testing::Return(false));
+ EXPECT_CALL(*this, filter(testing::A<const process::DispatchEvent&>()))
+ .WillRepeatedly(testing::Return(false));
+ EXPECT_CALL(*this, filter(testing::A<const process::HttpEvent&>()))
+ .WillRepeatedly(testing::Return(false));
+ EXPECT_CALL(*this, filter(testing::A<const process::ExitedEvent&>()))
+ .WillRepeatedly(testing::Return(false));
+ }
+
+ MOCK_METHOD1(filter, bool(const process::MessageEvent&));
+ MOCK_METHOD1(filter, bool(const process::DispatchEvent&));
+ MOCK_METHOD1(filter, bool(const process::HttpEvent&));
+ MOCK_METHOD1(filter, bool(const process::ExitedEvent&));
};
@@ -171,19 +188,29 @@ public:
*/
MATCHER_P3(MsgMatcher, name, from, to, "")
{
- return (testing::Matcher<std::string>(name).Matches(arg->name) &&
- testing::Matcher<process::UPID>(from).Matches(arg->from) &&
- testing::Matcher<process::UPID>(to).Matches(arg->to));
+ const process::MessageEvent& event = ::std::tr1::get<0>(arg);
+ return (testing::Matcher<std::string>(name).Matches(event.message->name) &&
+ testing::Matcher<process::UPID>(from).Matches(event.message->from) &&
+ testing::Matcher<process::UPID>(to).Matches(event.message->to));
}
/**
* This macro provides some syntactic sugar for matching messages
* using the message matcher (see above) as well as the MockFilter
- * (see above).
+ * (see above). We should also add EXPECT_DISPATCH, EXPECT_HTTP, etc.
*/
-#define EXPECT_MSG(mockFilter, name, from, to) \
- EXPECT_CALL(mockFilter, filter(MsgMatcher(name, from, to)))
+#define EXPECT_MESSAGE(mockFilter, name, from, to) \
+ EXPECT_CALL(mockFilter, filter(testing::A<const process::MessageEvent&>())) \
+ .With(MsgMatcher(name, from, to))
+
+
+ACTION_TEMPLATE(SaveArgField,
+ HAS_1_TEMPLATE_PARAMS(int, k),
+ AND_2_VALUE_PARAMS(field, pointer))
+{
+ *pointer = *(::std::tr1::get<k>(args).*field);
+}
/**
@@ -317,7 +344,8 @@ private:
process::PID<slave::Slave> slave;
};
-}}} // namespace mesos { namespace internal { namespace test {
-
+} // namespace test {
+} // namespace internal {
+} // namespace mesos {
#endif // __TESTING_UTILS_HPP__
Modified: incubator/mesos/trunk/src/zookeeper/group.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/zookeeper/group.cpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/zookeeper/group.cpp (original)
+++ incubator/mesos/trunk/src/zookeeper/group.cpp Fri Jan 27 01:25:13 2012
@@ -47,12 +47,12 @@ public:
void initialize();
// Group implementation.
- Promise<Group::Membership> join(const string& info);
- Promise<bool> cancel(const Group::Membership& membership);
- Promise<string> info(const Group::Membership& membership);
- Promise<set<Group::Membership> > watch(
+ Future<Group::Membership> join(const string& info);
+ Future<bool> cancel(const Group::Membership& membership);
+ Future<string> info(const Group::Membership& membership);
+ Future<set<Group::Membership> > watch(
const set<Group::Membership>& expected);
- Promise<Option<int64_t> > session();
+ Future<Option<int64_t> > session();
// ZooKeeper events.
void connected(bool reconnect);
@@ -137,10 +137,10 @@ private:
};
struct {
- queue<Join> joins;
- queue<Cancel> cancels;
- queue<Info> infos;
- queue<Watch> watches;
+ queue<Join*> joins;
+ queue<Cancel*> cancels;
+ queue<Info*> infos;
+ queue<Watch*> watches;
} pending;
bool retrying;
@@ -185,16 +185,16 @@ void GroupProcess::initialize()
}
-Promise<Group::Membership> GroupProcess::join(const string& info)
+Future<Group::Membership> GroupProcess::join(const string& info)
{
if (error.isSome()) {
Promise<Group::Membership> promise;
promise.fail(error.get());
- return promise;
+ return promise.future();
} else if (state != CONNECTED) {
- Join join(info);
+ Join* join = new Join(info);
pending.joins.push(join);
- return join.promise;
+ return join->promise.future();
}
// TODO(benh): Write a test to see how ZooKeeper fails setting znode
@@ -212,13 +212,13 @@ Promise<Group::Membership> GroupProcess:
delay(RETRY_SECONDS, self(), &GroupProcess::retry, RETRY_SECONDS);
retrying = true;
}
- Join join(info);
+ Join* join = new Join(info);
pending.joins.push(join);
- return join.promise;
+ return join->promise.future();
} else if (membership.isError()) {
Promise<Group::Membership> promise;
promise.fail(membership.error());
- return promise;
+ return promise.future();
}
owned.insert(make_pair(membership.get(), info));
@@ -227,20 +227,20 @@ Promise<Group::Membership> GroupProcess:
}
-Promise<bool> GroupProcess::cancel(const Group::Membership& membership)
+Future<bool> GroupProcess::cancel(const Group::Membership& membership)
{
if (error.isSome()) {
Promise<bool> promise;
promise.fail(error.get());
- return promise;
+ return promise.future();
} else if (owned.count(membership) == 0) {
return false; // TODO(benh): Should this be an error?
}
if (state != CONNECTED) {
- Cancel cancel(membership);
+ Cancel* cancel = new Cancel(membership);
pending.cancels.push(cancel);
- return cancel.promise;
+ return cancel->promise.future();
}
// TODO(benh): Only attempt if the pending queue is empty so that a
@@ -254,29 +254,29 @@ Promise<bool> GroupProcess::cancel(const
delay(RETRY_SECONDS, self(), &GroupProcess::retry, RETRY_SECONDS);
retrying = true;
}
- Cancel cancel(membership);
+ Cancel* cancel = new Cancel(membership);
pending.cancels.push(cancel);
- return cancel.promise;
+ return cancel->promise.future();
} else if (cancellation.isError()) {
Promise<bool> promise;
promise.fail(cancellation.error());
- return promise;
+ return promise.future();
}
return cancellation.get();
}
-Promise<string> GroupProcess::info(const Group::Membership& membership)
+Future<string> GroupProcess::info(const Group::Membership& membership)
{
if (error.isSome()) {
Promise<string> promise;
promise.fail(error.get());
- return promise;
+ return promise.future();
} else if (state != CONNECTED) {
- Info info(membership);
+ Info* info = new Info(membership);
pending.infos.push(info);
- return info.promise;
+ return info->promise.future();
}
// TODO(benh): Only attempt if the pending queue is empty so that a
@@ -286,30 +286,30 @@ Promise<string> GroupProcess::info(const
Result<string> result = doInfo(membership);
if (result.isNone()) { // Try again later.
- Info info(membership);
+ Info* info = new Info(membership);
pending.infos.push(info);
- return info.promise;
+ return info->promise.future();
} else if (result.isError()) {
Promise<string> promise;
promise.fail(result.error());
- return promise;
+ return promise.future();
}
return result.get();
}
-Promise<set<Group::Membership> > GroupProcess::watch(
+Future<set<Group::Membership> > GroupProcess::watch(
const set<Group::Membership>& expected)
{
if (error.isSome()) {
Promise<set<Group::Membership> > promise;
promise.fail(error.get());
- return promise;
+ return promise.future();
} else if (state != CONNECTED) {
- Watch watch(expected);
+ Watch* watch = new Watch(expected);
pending.watches.push(watch);
- return watch.promise;
+ return watch->promise.future();
}
// To guarantee causality, we must invalidate our cache of
@@ -330,25 +330,25 @@ Promise<set<Group::Membership> > GroupPr
delay(RETRY_SECONDS, self(), &GroupProcess::retry, RETRY_SECONDS);
retrying = true;
}
- Watch watch(expected);
+ Watch* watch = new Watch(expected);
pending.watches.push(watch);
- return watch.promise;
+ return watch->promise.future();
} else if (memberships.get() == expected) { // Just wait for updates.
- Watch watch(expected);
+ Watch* watch = new Watch(expected);
pending.watches.push(watch);
- return watch.promise;
+ return watch->promise.future();
}
return memberships.get();
}
-Promise<Option<int64_t> > GroupProcess::session()
+Future<Option<int64_t> > GroupProcess::session()
{
if (error.isSome()) {
Promise<Option<int64_t> > promise;
promise.fail(error.get());
- return promise;
+ return promise.future();
} else if (state != CONNECTED) {
return Option<int64_t>::none();
}
@@ -622,12 +622,14 @@ void GroupProcess::update()
CHECK(memberships.isSome());
size_t size = pending.watches.size();
for (int i = 0; i < size; i++) {
- if (memberships.get() != pending.watches.front().expected) {
- pending.watches.front().promise.set(memberships.get());
+ if (memberships.get() != pending.watches.front()->expected) {
+ pending.watches.front()->promise.set(memberships.get());
} else {
pending.watches.push(pending.watches.front());
}
+ Watch* watch = pending.watches.front();
pending.watches.pop();
+ delete watch;
}
}
@@ -639,43 +641,49 @@ bool GroupProcess::sync()
// Do joins.
while (!pending.joins.empty()) {
- Result<Group::Membership> membership = doJoin(pending.joins.front().info);
+ Result<Group::Membership> membership = doJoin(pending.joins.front()->info);
if (membership.isNone()) {
return false; // Try again later.
} else if (membership.isError()) {
- pending.joins.front().promise.fail(membership.error());
+ pending.joins.front()->promise.fail(membership.error());
} else {
- owned.insert(make_pair(membership.get(), pending.joins.front().info));
- pending.joins.front().promise.set(membership.get());
+ owned.insert(make_pair(membership.get(), pending.joins.front()->info));
+ pending.joins.front()->promise.set(membership.get());
}
+ Join* join = pending.joins.front();
pending.joins.pop();
+ delete join;
}
// Do cancels.
while (!pending.cancels.empty()) {
- Result<bool> cancellation = doCancel(pending.cancels.front().membership);
+ Result<bool> cancellation = doCancel(pending.cancels.front()->membership);
if (cancellation.isNone()) {
return false; // Try again later.
} else if (cancellation.isError()) {
- pending.cancels.front().promise.fail(cancellation.error());
+ pending.cancels.front()->promise.fail(cancellation.error());
} else {
- pending.cancels.front().promise.set(cancellation.get());
+ pending.cancels.front()->promise.set(cancellation.get());
}
+ Cancel* cancel = pending.cancels.front();
pending.cancels.pop();
+ delete cancel;
}
// Do infos.
while (!pending.infos.empty()) {
// TODO(benh): Ignore if future has been discarded?
- Result<string> result = doInfo(pending.infos.front().membership);
+ Result<string> result = doInfo(pending.infos.front()->membership);
if (result.isNone()) {
return false; // Try again later.
} else if (result.isError()) {
- pending.infos.front().promise.fail(result.error());
+ pending.infos.front()->promise.fail(result.error());
} else {
- pending.infos.front().promise.set(result.get());
+ pending.infos.front()->promise.set(result.get());
}
+ Info* info = pending.infos.front();
pending.infos.pop();
+ delete info;
}
// Get cache of memberships if we don't have one.
@@ -708,11 +716,13 @@ void GroupProcess::retry(double seconds)
template <typename T>
-void fail(queue<T>* queue, const string& message)
+void fail(queue<T*>* queue, const string& message)
{
while (!queue->empty()) {
- queue->front().promise.fail(message);
+ T* t = queue->front();
queue->pop();
+ t->promise.fail(message);
+ delete t;
}
}