You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 11:15:52 UTC
svn commit: r1132272 [2/2] - in /incubator/mesos/trunk/src: common/
detector/ exec/ local/ master/ messaging/ sched/ slave/ tests/
Modified: incubator/mesos/trunk/src/sched/sched.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/sched/sched.cpp?rev=1132272&r1=1132271&r2=1132272&view=diff
==============================================================================
--- incubator/mesos/trunk/src/sched/sched.cpp (original)
+++ incubator/mesos/trunk/src/sched/sched.cpp Sun Jun 5 09:15:51 2011
@@ -44,6 +44,9 @@ using boost::unordered_set;
using google::protobuf::RepeatedPtrField;
+using process::PID;
+using process::UPID;
+
using std::map;
using std::string;
using std::vector;
@@ -63,10 +66,11 @@ namespace mesos { namespace internal {
#define STATUS_UPDATE_TIMEOUT 120
-class StatusUpdateTimer : public MesosProcess
+class StatusUpdateTimer : public MesosProcess<StatusUpdateTimer>
{
public:
- StatusUpdateTimer(const PID &_sched, const FrameworkID& _frameworkId,
+ StatusUpdateTimer(const PID<SchedulerProcess> &_sched,
+ const FrameworkID& _frameworkId,
const TaskDescription& task)
: sched(_sched), frameworkId(_frameworkId), taskId(task.task_id()),
slaveId(task.slave_id()), terminate(false) {}
@@ -100,7 +104,7 @@ protected:
}
private:
- const PID sched;
+ const PID<SchedulerProcess> sched;
const FrameworkID frameworkId;
const TaskID taskId;
const SlaveID slaveId;
@@ -114,14 +118,14 @@ private:
// we allow friend functions to invoke 'send', 'post', etc. Therefore,
// we must make sure that any necessary synchronization is performed.
-class SchedulerProcess : public MesosProcess
+class SchedulerProcess : public MesosProcess<SchedulerProcess>
{
public:
SchedulerProcess(MesosSchedulerDriver* _driver, Scheduler* _sched,
const FrameworkID& _frameworkId,
const FrameworkInfo& _framework)
: driver(_driver), sched(_sched), frameworkId(_frameworkId),
- framework(_framework), generation(0), master(PID()), terminate(false)
+ framework(_framework), generation(0), master(UPID()), terminate(false)
{
install(NEW_MASTER_DETECTED, &SchedulerProcess::newMasterDetected,
&NewMasterDetectedMessage::pid);
@@ -160,7 +164,7 @@ public:
{
// Cleanup any remaining timers.
foreachpair (const TaskID& taskId, StatusUpdateTimer* timer, timers) {
- send(timer->self(), TERMINATE);
+ send(timer->self(), process::TERMINATE);
wait(timer->self());
delete timer;
}
@@ -249,13 +253,15 @@ protected:
// TODO(benh): Better error codes/messages!
int32_t code = 1;
const string& message = "Failed to detect master(s)";
- invoke(bind(&Scheduler::error, sched, driver, code, cref(message)));
+ process::invoke(bind(&Scheduler::error, sched, driver, code,
+ cref(message)));
}
void registerReply(const FrameworkID& frameworkId)
{
this->frameworkId = frameworkId;
- invoke(bind(&Scheduler::registered, sched, driver, cref(frameworkId)));
+ process::invoke(bind(&Scheduler::registered, sched, driver,
+ cref(frameworkId)));
}
void resourceOffer(const OfferID& offerId,
@@ -267,19 +273,20 @@ protected:
CHECK(offers.size() == pids.size());
for (int i = 0; i < offers.size(); i++) {
- PID pid(pids[i]);
- CHECK(pid != PID());
+ UPID pid(pids[i]);
+ CHECK(pid != UPID());
savedOffers[offerId][offers[i].slave_id()] = pid;
}
- invoke(bind(&Scheduler::resourceOffer, sched, driver, cref(offerId),
- cref(offers)));
+ process::invoke(bind(&Scheduler::resourceOffer, sched, driver,
+ cref(offerId), cref(offers)));
}
void rescindOffer(const OfferID& offerId)
{
savedOffers.erase(offerId);
- invoke(bind(&Scheduler::offerRescinded, sched, driver, cref(offerId)));
+ process::invoke(bind(&Scheduler::offerRescinded, sched, driver,
+ cref(offerId)));
}
void statusUpdate(const FrameworkID& frameworkId, const TaskStatus& status)
@@ -299,14 +306,15 @@ protected:
if (timers.count(status.task_id()) > 0) {
StatusUpdateTimer* timer = timers[status.task_id()];
timers.erase(status.task_id());
- send(timer->self(), TERMINATE);
+ send(timer->self(), process::TERMINATE);
wait(timer->self());
delete timer;
}
- invoke(bind(&Scheduler::statusUpdate, sched, driver, cref(status)));
+ process::invoke(bind(&Scheduler::statusUpdate, sched, driver,
+ cref(status)));
- // Acknowledge the message (we do this last, after we invoked
+ // Acknowledge the message (we do this last, after we process::invoked
// the scheduler, if we did at all, in case it causes a crash,
// since this way the message might get resent/routed after
// the scheduler comes back online).
@@ -320,17 +328,18 @@ protected:
void lostSlave(const SlaveID& slaveId)
{
savedSlavePids.erase(slaveId);
- invoke(bind(&Scheduler::slaveLost, sched, driver, cref(slaveId)));
+ process::invoke(bind(&Scheduler::slaveLost, sched, driver, cref(slaveId)));
}
void frameworkMessage(const FrameworkMessage& message)
{
- invoke(bind(&Scheduler::frameworkMessage, sched, driver, cref(message)));
+ process::invoke(bind(&Scheduler::frameworkMessage, sched, driver,
+ cref(message)));
}
void error(int32_t code, const string& message)
{
- invoke(bind(&Scheduler::error, sched, driver, code, cref(message)));
+ process::invoke(bind(&Scheduler::error, sched, driver, code, cref(message)));
}
void stop()
@@ -415,8 +424,8 @@ protected:
// accepted.
if (savedSlavePids.count(message.slave_id()) > 0) {
- PID slave = savedSlavePids[message.slave_id()];
- CHECK(slave != PID());
+ UPID slave = savedSlavePids[message.slave_id()];
+ CHECK(slave != UPID());
// TODO(benh): This is kind of wierd, M2S?
MSG<M2S_FRAMEWORK_MESSAGE> out;
@@ -442,13 +451,13 @@ private:
FrameworkID frameworkId;
FrameworkInfo framework;
int32_t generation;
- PID master;
+ UPID master;
volatile bool active;
volatile bool terminate;
- unordered_map<OfferID, unordered_map<SlaveID, PID> > savedOffers;
- unordered_map<SlaveID, PID> savedSlavePids;
+ unordered_map<OfferID, unordered_map<SlaveID, UPID> > savedOffers;
+ unordered_map<SlaveID, UPID> savedSlavePids;
// Timers to ensure we get a status update for each task we launch.
unordered_map<TaskID, StatusUpdateTimer *> timers;
@@ -554,6 +563,9 @@ void MesosSchedulerDriver::init(Schedule
pthread_cond_init(&cond, 0);
// TODO(benh): Initialize glog.
+
+ // Initialize libprocess library (but not glog, done above).
+ process::initialize(false);
}
@@ -573,7 +585,7 @@ MesosSchedulerDriver::~MesosSchedulerDri
// not this was about to be deadlock, and possibly report this back
// to the user somehow.
if (process != NULL) {
- Process::wait(process->self());
+ process::wait(process->self());
delete process;
}
@@ -588,8 +600,9 @@ MesosSchedulerDriver::~MesosSchedulerDri
delete conf;
// Check and see if we need to shutdown a local cluster.
- if (url == "local" || url == "localquiet")
+ if (url == "local" || url == "localquiet") {
local::shutdown();
+ }
}
@@ -622,15 +635,15 @@ int MesosSchedulerDriver::start()
process = new SchedulerProcess(this, sched, frameworkId, framework);
- PID pid = Process::spawn(process);
+ UPID pid = process::spawn(process);
// Check and see if we need to launch a local cluster.
if (url == "local") {
- PID master = local::launch(*conf, true);
+ const PID<master::Master>& master = local::launch(*conf, true);
detector = new BasicMasterDetector(master, pid);
} else if (url == "localquiet") {
conf->set("quiet", 1);
- PID master = local::launch(*conf, true);
+ const PID<master::Master>& master = local::launch(*conf, true);
detector = new BasicMasterDetector(master, pid);
} else {
detector = MasterDetector::create(url, pid, false, false);
@@ -654,7 +667,7 @@ int MesosSchedulerDriver::stop()
// getExecutorInfo which threw exceptions, or explicitely called
// stop. See above in start).
if (process != NULL) {
- Process::dispatch(process, &SchedulerProcess::stop);
+ process::dispatch(process->self(), &SchedulerProcess::stop);
process->terminate = true;
process = NULL;
}
@@ -698,7 +711,8 @@ int MesosSchedulerDriver::killTask(const
return -1;
}
- Process::dispatch(process, &SchedulerProcess::killTask, taskId);
+ process::dispatch(process->self(), &SchedulerProcess::killTask,
+ taskId);
return 0;
}
@@ -714,7 +728,7 @@ int MesosSchedulerDriver::replyToOffer(c
return -1;
}
- Process::dispatch(process, &SchedulerProcess::replyToOffer,
+ process::dispatch(process->self(), &SchedulerProcess::replyToOffer,
offerId, tasks, params);
return 0;
@@ -729,7 +743,7 @@ int MesosSchedulerDriver::reviveOffers()
return -1;
}
- Process::dispatch(process, &SchedulerProcess::reviveOffers);
+ process::dispatch(process->self(), &SchedulerProcess::reviveOffers);
return 0;
}
@@ -754,7 +768,8 @@ int MesosSchedulerDriver::sendFrameworkM
return -1;
}
- Process::dispatch(process, &SchedulerProcess::sendFrameworkMessage, message);
+ process::dispatch(process->self(), &SchedulerProcess::sendFrameworkMessage,
+ message);
return 0;
}
Modified: incubator/mesos/trunk/src/slave/lxc_isolation_module.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/lxc_isolation_module.hpp?rev=1132272&r1=1132271&r2=1132272&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/lxc_isolation_module.hpp (original)
+++ incubator/mesos/trunk/src/slave/lxc_isolation_module.hpp Sun Jun 5 09:15:51 2011
@@ -19,7 +19,7 @@ using boost::unordered_map;
class LxcIsolationModule : public IsolationModule {
public:
// Reaps framework containers and tells the slave if they exit
- class Reaper : public Process {
+ class Reaper : public process::Process<Reaper> {
LxcIsolationModule* module;
protected:
Modified: incubator/mesos/trunk/src/slave/main.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/main.cpp?rev=1132272&r1=1132271&r2=1132272&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/main.cpp (original)
+++ incubator/mesos/trunk/src/slave/main.cpp Sun Jun 5 09:15:51 2011
@@ -3,6 +3,8 @@
#include "configurator/configurator.hpp"
+#include "detector/detector.hpp"
+
#include "isolation_module_factory.hpp"
#include "slave.hpp"
#include "webui.hpp"
@@ -37,13 +39,15 @@ void usage(const char *programName, cons
int main(int argc, char **argv)
{
Configurator configurator;
+ Logging::registerOptions(&configurator);
+ Slave::registerOptions(&configurator);
+ configurator.addOption<int>("port", 'p', "Port to listen on", 5050);
+ configurator.addOption<string>("ip", "IP address to listen on");
configurator.addOption<string>("url", 'u', "Master URL");
configurator.addOption<string>("isolation", 'i', "Isolation module name", "process");
#ifdef MESOS_WEBUI
configurator.addOption<int>("webui_port", 'w', "Web UI port", 8081);
#endif
- Logging::registerOptions(&configurator);
- Slave::registerOptions(&configurator);
if (argc == 2 && string("--help") == argv[1]) {
usage(argv[0], configurator);
@@ -60,6 +64,17 @@ int main(int argc, char **argv)
Logging::init(argv[0], conf);
+ if (conf.contains("port")) {
+ setenv("LIBPROCESS_PORT", conf["port"].c_str(), 1);
+ }
+
+ if (conf.contains("ip")) {
+ setenv("LIBPROCESS_IP", conf["ip"].c_str(), 1);
+ }
+
+ // Initialize libprocess library (but not glog, done above).
+ process::initialize(false);
+
if (!conf.contains("url")) {
cerr << "Master URL argument (--url) required." << endl;
exit(1);
@@ -78,27 +93,25 @@ int main(int argc, char **argv)
LOG(INFO) << "Build: " << build::DATE << " by " << build::USER;
LOG(INFO) << "Starting Mesos slave";
- if (chdir(dirname(argv[0])) != 0)
+ if (chdir(dirname(argv[0])) != 0) {
fatalerror("Could not chdir into %s", dirname(argv[0]));
+ }
Slave* slave = new Slave(conf, false, isolationModule);
- Process::spawn(slave);
+ process::spawn(slave);
MasterDetector* detector =
MasterDetector::create(url, slave->self(), false, Logging::isQuiet(conf));
#ifdef MESOS_WEBUI
- startSlaveWebUI(slave, conf);
+ startSlaveWebUI(slave->self(), conf);
#endif
- Process::wait(slave->self());
- MasterDetector::destroy(detector);
- IsolationModule::destroy(isolationModule);
-
- delete isolationModule;
- delete detector;
+ process::wait(slave->self());
delete slave;
+ MasterDetector::destroy(detector);
+ IsolationModule::destroy(isolationModule);
return 0;
}
Modified: incubator/mesos/trunk/src/slave/process_based_isolation_module.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/process_based_isolation_module.cpp?rev=1132272&r1=1132271&r2=1132272&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/process_based_isolation_module.cpp (original)
+++ incubator/mesos/trunk/src/slave/process_based_isolation_module.cpp Sun Jun 5 09:15:51 2011
@@ -39,8 +39,8 @@ ProcessBasedIsolationModule::~ProcessBas
// could thus lead to a seg fault!
if (initialized) {
CHECK(reaper != NULL);
- Process::post(reaper->self(), TERMINATE);
- Process::wait(reaper->self());
+ process::post(reaper->self(), process::TERMINATE);
+ process::wait(reaper->self());
delete reaper;
}
}
@@ -50,7 +50,7 @@ void ProcessBasedIsolationModule::initia
{
this->slave = slave;
reaper = new Reaper(this);
- Process::spawn(reaper);
+ process::spawn(reaper);
initialized = true;
}
@@ -150,7 +150,7 @@ void ProcessBasedIsolationModule::Reaper
link(module->slave->self());
while (true) {
receive(1);
- if (name() == TIMEOUT) {
+ if (name() == process::TIMEOUT) {
// Check whether any child process has exited.
pid_t pid;
int status;
@@ -172,7 +172,7 @@ void ProcessBasedIsolationModule::Reaper
}
}
}
- } else if (name() == TERMINATE || name() == EXIT) {
+ } else if (name() == process::TERMINATE || name() == process::EXIT) {
return;
}
}
Modified: incubator/mesos/trunk/src/slave/process_based_isolation_module.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/process_based_isolation_module.hpp?rev=1132272&r1=1132271&r2=1132272&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/process_based_isolation_module.hpp (original)
+++ incubator/mesos/trunk/src/slave/process_based_isolation_module.hpp Sun Jun 5 09:15:51 2011
@@ -30,7 +30,7 @@ public:
virtual void resourcesChanged(Framework* framework, Executor* executor);
// Reaps child processes and tells the slave if they exit
- class Reaper : public Process {
+ class Reaper : public process::Process<Reaper> {
ProcessBasedIsolationModule* module;
protected:
Modified: incubator/mesos/trunk/src/slave/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.cpp?rev=1132272&r1=1132271&r2=1132272&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave/slave.cpp Sun Jun 5 09:15:51 2011
@@ -23,6 +23,9 @@ using namespace mesos::internal::slave;
using boost::unordered_map;
using boost::unordered_set;
+using process::Promise;
+using process::UPID;
+
using std::list;
using std::make_pair;
using std::ostringstream;
@@ -80,7 +83,7 @@ Slave::~Slave()
}
-Result<state::SlaveState*> Slave::getState()
+Promise<state::SlaveState*> Slave::getState()
{
Resources resources(resources);
Resource::Scalar cpus;
@@ -456,7 +459,7 @@ void Slave::operator () ()
LOG(WARNING) << "Not expecting executor '" << msg.executor_id()
<< "' of framework " << msg.framework_id();
send(from(), S2E_KILL_EXECUTOR);
- } else if (executor->pid != PID()) {
+ } else if (executor->pid != UPID()) {
LOG(WARNING) << "Not good, executor '" << msg.executor_id()
<< "' of framework " << msg.framework_id()
<< " is already running";
@@ -612,7 +615,7 @@ void Slave::sendQueuedTasks(Framework* f
LOG(INFO) << "Flushing queued tasks for framework "
<< framework->frameworkId;
- CHECK(executor->pid != PID());
+ CHECK(executor->pid != UPID());
foreach (const TaskDescription& task, executor->queuedTasks) {
// Add the task to the executor.
Modified: incubator/mesos/trunk/src/slave/slave.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.hpp?rev=1132272&r1=1132271&r2=1132272&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.hpp (original)
+++ incubator/mesos/trunk/src/slave/slave.hpp Sun Jun 5 09:15:51 2011
@@ -40,8 +40,6 @@
#include "configurator/configurator.hpp"
-#include "detector/detector.hpp"
-
#include "messaging/messages.hpp"
@@ -57,7 +55,7 @@ const double STATUS_UPDATE_RETRY_TIMEOUT
struct Executor
{
Executor(const FrameworkID& _frameworkId, const ExecutorInfo& _info)
- : frameworkId(_frameworkId), info(_info), pid(PID()) {}
+ : frameworkId(_frameworkId), info(_info), pid(process::UPID()) {}
~Executor()
{
@@ -111,7 +109,7 @@ struct Executor
const FrameworkID frameworkId;
const ExecutorInfo info;
- PID pid;
+ process::UPID pid;
std::list<TaskDescription> queuedTasks;
boost::unordered_map<TaskID, Task*> tasks;
@@ -128,7 +126,7 @@ struct Executor
struct Framework
{
Framework( const FrameworkID& _frameworkId, const FrameworkInfo& _info,
- const PID& _pid)
+ const process::UPID& _pid)
: frameworkId(_frameworkId), info(_info), pid(_pid) {}
~Framework() {}
@@ -173,7 +171,7 @@ struct Framework
const FrameworkID frameworkId;
const FrameworkInfo info;
- PID pid;
+ process::UPID pid;
boost::unordered_map<ExecutorID, Executor*> executors;
boost::unordered_map<double, boost::unordered_map<TaskID, TaskStatus> > statuses;
@@ -181,10 +179,10 @@ struct Framework
// Periodically sends heartbeats to the master
-class Heart : public MesosProcess
+class Heart : public MesosProcess<Heart>
{
public:
- Heart(const PID &_master, const PID &_slave,
+ Heart(const process::UPID &_master, const process::UPID &_slave,
const SlaveID& _slaveId, double _interval)
: master(_master), slave(_slave), slaveId(_slaveId), interval(_interval) {}
@@ -209,14 +207,14 @@ protected:
}
private:
- const PID master;
- const PID slave;
+ const process::UPID master;
+ const process::UPID slave;
const SlaveID slaveId;
const double interval;
};
-class Slave : public MesosProcess
+class Slave : public MesosProcess<Slave>
{
public:
Slave(const Resources& resources, bool local,
@@ -229,7 +227,7 @@ public:
static void registerOptions(Configurator* conf);
- Result<state::SlaveState*> getState();
+ process::Promise<state::SlaveState*> getState();
// Callback used by isolation module to tell us when an executor exits.
void executorExited(const FrameworkID& frameworkId, const ExecutorID& executorId, int result);
@@ -258,7 +256,7 @@ protected:
private:
Configuration conf;
- PID master;
+ process::UPID master;
Resources resources;
// Invariant: framework will exist if executor exists.
Modified: incubator/mesos/trunk/src/slave/webui.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/webui.cpp?rev=1132272&r1=1132271&r2=1132272&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/webui.cpp (original)
+++ incubator/mesos/trunk/src/slave/webui.cpp Sun Jun 5 09:15:51 2011
@@ -12,6 +12,8 @@
#include <Python.h>
+using process::PID;
+
using std::string;
@@ -20,7 +22,7 @@ extern "C" void init_slave(); // Initia
namespace mesos { namespace internal { namespace slave {
-static Slave* slave;
+static PID<Slave> slave;
static string webuiPort;
static string logDir;
static string workDir;
@@ -49,7 +51,7 @@ void* runSlaveWebUI(void*)
}
-void startSlaveWebUI(Slave* _slave, const Configuration &conf)
+void startSlaveWebUI(const PID<Slave>& _slave, const Configuration& conf)
{
// TODO(*): See the note in master/webui.cpp about having to
// determine default values. These should be set by now and can just
@@ -81,7 +83,7 @@ namespace state {
// From slave_state.hpp.
SlaveState* get_slave()
{
- return Process::call(slave, &Slave::getState);
+ return process::call(slave, &Slave::getState);
}
} // namespace state {
Modified: incubator/mesos/trunk/src/slave/webui.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/webui.hpp?rev=1132272&r1=1132271&r2=1132272&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/webui.hpp (original)
+++ incubator/mesos/trunk/src/slave/webui.hpp Sun Jun 5 09:15:51 2011
@@ -12,9 +12,10 @@
namespace mesos { namespace internal { namespace slave {
-void startSlaveWebUI(Slave* slave, const Configuration& conf);
+void startSlaveWebUI(const process::PID<Slave>& slave,
+ const Configuration& conf);
-}}} // namespace
+}}} // namespace mesos { namespace internal { namespace slave {
#endif // MESOS_WEBUI
Modified: incubator/mesos/trunk/src/tests/main.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/main.cpp?rev=1132272&r1=1132271&r2=1132272&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/main.cpp (original)
+++ incubator/mesos/trunk/src/tests/main.cpp Sun Jun 5 09:15:51 2011
@@ -7,6 +7,8 @@
#include <string>
+#include <process.hpp>
+
#include <common/fatal.hpp>
#include <configurator/configurator.hpp>
@@ -58,5 +60,9 @@ int main(int argc, char** argv)
testing::FLAGS_gtest_death_test_style = "threadsafe";
if (argc == 2 && strcmp("-v", argv[1]) == 0)
google::SetStderrLogging(google::INFO);
+
+ // Initialize libprocess library (but not glog, done above).
+ process::initialize(false);
+
return RUN_ALL_TESTS();
}
Modified: incubator/mesos/trunk/src/tests/master_test.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/master_test.cpp?rev=1132272&r1=1132271&r2=1132272&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/master_test.cpp (original)
+++ incubator/mesos/trunk/src/tests/master_test.cpp Sun Jun 5 09:15:51 2011
@@ -5,6 +5,8 @@
#include <boost/lexical_cast.hpp>
+#include <detector/detector.hpp>
+
#include <local/local.hpp>
#include <master/master.hpp>
@@ -27,6 +29,8 @@ using mesos::internal::slave::Slave;
using mesos::internal::slave::ProcessBasedIsolationModule;
using mesos::internal::slave::STATUS_UPDATE_RETRY_TIMEOUT;
+using process::PID;
+
using std::string;
using std::map;
using std::vector;
@@ -105,7 +109,7 @@ TEST(MasterTest, ResourceOfferWithMultip
{
ASSERT_TRUE(GTEST_IS_THREADSAFE);
- PID master = local::launch(10, 2, 1 * Gigabyte, false, false);
+ PID<Master> master = local::launch(10, 2, 1 * Gigabyte, false, false);
MockScheduler sched;
MesosSchedulerDriver driver(&sched, master);
@@ -151,7 +155,7 @@ TEST(MasterTest, ResourcesReofferedAfter
{
ASSERT_TRUE(GTEST_IS_THREADSAFE);
- PID master = local::launch(10, 2, 1 * Gigabyte, false, false);
+ PID<Master> master = local::launch(10, 2, 1 * Gigabyte, false, false);
MockScheduler sched1;
MesosSchedulerDriver driver1(&sched1, master);
@@ -216,7 +220,7 @@ TEST(MasterTest, ResourcesReofferedAfter
{
ASSERT_TRUE(GTEST_IS_THREADSAFE);
- PID master = local::launch(1, 2, 1 * Gigabyte, false, false);
+ PID<Master> master = local::launch(1, 2, 1 * Gigabyte, false, false);
MockScheduler sched1;
MesosSchedulerDriver driver1(&sched1, master);
@@ -315,14 +319,14 @@ TEST(MasterTest, SlaveLost)
ASSERT_TRUE(GTEST_IS_THREADSAFE);
Master m;
- PID master = Process::spawn(&m);
+ PID<Master> master = process::spawn(&m);
Resources resources = Resources::parse("cpus:2;mem:1024");
ProcessBasedIsolationModule isolationModule;
Slave s(resources, true, &isolationModule);
- PID slave = Process::spawn(&s);
+ PID<Slave> slave = process::spawn(&s);
BasicMasterDetector detector(master, slave, true);
@@ -361,7 +365,7 @@ TEST(MasterTest, SlaveLost)
EXPECT_CALL(sched, slaveLost(&driver, offers[0].slave_id()))
.WillOnce(Trigger(&slaveLostCall));
- Process::post(slave, TERMINATE);
+ process::post(slave, process::TERMINATE);
WAIT_UNTIL(offerRescindedCall);
WAIT_UNTIL(slaveLostCall);
@@ -369,10 +373,10 @@ TEST(MasterTest, SlaveLost)
driver.stop();
driver.join();
- Process::wait(slave);
+ process::wait(slave);
- Process::post(master, TERMINATE);
- Process::wait(master);
+ process::post(master, process::TERMINATE);
+ process::wait(master);
}
@@ -380,7 +384,7 @@ TEST(MasterTest, SchedulerFailover)
{
ASSERT_TRUE(GTEST_IS_THREADSAFE);
- PID master = local::launch(1, 2, 1 * Gigabyte, false, false);
+ PID<Master> master = local::launch(1, 2, 1 * Gigabyte, false, false);
// Launch the first (i.e., failing) scheduler and wait until
// registered gets called to launch the second (i.e., failover)
@@ -457,15 +461,15 @@ TEST(MasterTest, SlavePartitioned)
{
ASSERT_TRUE(GTEST_IS_THREADSAFE);
- Clock::pause();
+ process::Clock::pause();
MockFilter filter;
- Process::filter(&filter);
+ process::filter(&filter);
EXPECT_MSG(filter, _, _, _)
.WillRepeatedly(Return(false));
- PID master = local::launch(1, 2, 1 * Gigabyte, false, false);
+ PID<Master> master = local::launch(1, 2, 1 * Gigabyte, false, false);
MockScheduler sched;
MesosSchedulerDriver driver(&sched, master);
@@ -490,12 +494,12 @@ TEST(MasterTest, SlavePartitioned)
EXPECT_CALL(sched, slaveLost(&driver, _))
.WillOnce(Trigger(&slaveLostCall));
- EXPECT_MSG(filter, Eq(MesosProcess::names[SH2M_HEARTBEAT]), _, _)
+ EXPECT_MSG(filter, Eq(names[SH2M_HEARTBEAT]), _, _)
.WillRepeatedly(Return(true));
driver.start();
- Clock::advance(master::HEARTBEAT_TIMEOUT);
+ process::Clock::advance(master::HEARTBEAT_TIMEOUT);
WAIT_UNTIL(slaveLostCall);
@@ -504,9 +508,9 @@ TEST(MasterTest, SlavePartitioned)
local::shutdown();
- Process::filter(NULL);
+ process::filter(NULL);
- Clock::resume();
+ process::Clock::resume();
}
@@ -515,7 +519,7 @@ TEST(MasterTest, TaskRunning)
ASSERT_TRUE(GTEST_IS_THREADSAFE);
Master m;
- PID master = Process::spawn(&m);
+ PID<Master> master = process::spawn(&m);
Resources resources = Resources::parse("cpus:2;mem:1024");
@@ -539,7 +543,7 @@ TEST(MasterTest, TaskRunning)
TestingIsolationModule isolationModule(execs);
Slave s(resources, true, &isolationModule);
- PID slave = Process::spawn(&s);
+ PID<Slave> slave = process::spawn(&s);
BasicMasterDetector detector(master, slave, true);
@@ -592,11 +596,11 @@ TEST(MasterTest, TaskRunning)
driver.stop();
driver.join();
- Process::post(slave, TERMINATE);
- Process::wait(slave);
+ process::post(slave, process::TERMINATE);
+ process::wait(slave);
- Process::post(master, TERMINATE);
- Process::wait(master);
+ process::post(master, process::TERMINATE);
+ process::wait(master);
}
@@ -605,7 +609,7 @@ TEST(MasterTest, KillTask)
ASSERT_TRUE(GTEST_IS_THREADSAFE);
Master m;
- PID master = Process::spawn(&m);
+ PID<Master> master = process::spawn(&m);
Resources resources = Resources::parse("cpus:2;mem:1024");
@@ -634,7 +638,7 @@ TEST(MasterTest, KillTask)
TestingIsolationModule isolationModule(execs);
Slave s(resources, true, &isolationModule);
- PID slave = Process::spawn(&s);
+ PID<Slave> slave = process::spawn(&s);
BasicMasterDetector detector(master, slave, true);
@@ -694,11 +698,11 @@ TEST(MasterTest, KillTask)
driver.stop();
driver.join();
- Process::post(slave, TERMINATE);
- Process::wait(slave);
+ process::post(slave, process::TERMINATE);
+ process::wait(slave);
- Process::post(master, TERMINATE);
- Process::wait(master);
+ process::post(master, process::TERMINATE);
+ process::wait(master);
}
@@ -706,16 +710,16 @@ TEST(MasterTest, SchedulerFailoverStatus
{
ASSERT_TRUE(GTEST_IS_THREADSAFE);
- Clock::pause();
+ process::Clock::pause();
MockFilter filter;
- Process::filter(&filter);
+ process::filter(&filter);
EXPECT_MSG(filter, _, _, _)
.WillRepeatedly(Return(false));
Master m;
- PID master = Process::spawn(&m);
+ PID<Master> master = process::spawn(&m);
Resources resources = Resources::parse("cpus:2;mem:1024");
@@ -739,7 +743,7 @@ TEST(MasterTest, SchedulerFailoverStatus
TestingIsolationModule isolationModule(execs);
Slave s(resources, true, &isolationModule);
- PID slave = Process::spawn(&s);
+ PID<Slave> slave = process::spawn(&s);
BasicMasterDetector detector(master, slave, true);
@@ -774,7 +778,7 @@ TEST(MasterTest, SchedulerFailoverStatus
EXPECT_CALL(sched1, error(&driver1, _, "Framework failover"))
.Times(1);
- EXPECT_MSG(filter, Eq(MesosProcess::names[M2F_STATUS_UPDATE]), _, Ne(master))
+ EXPECT_MSG(filter, Eq(names[M2F_STATUS_UPDATE]), _, Ne(master))
.WillOnce(DoAll(Trigger(&statusUpdateMsg), Return(true)))
.RetiresOnSaturation();
@@ -823,7 +827,7 @@ TEST(MasterTest, SchedulerFailoverStatus
WAIT_UNTIL(registeredCall);
- Clock::advance(STATUS_UPDATE_RETRY_TIMEOUT);
+ process::Clock::advance(STATUS_UPDATE_RETRY_TIMEOUT);
WAIT_UNTIL(statusUpdateCall);
@@ -833,15 +837,15 @@ TEST(MasterTest, SchedulerFailoverStatus
driver1.join();
driver2.join();
- Process::post(slave, TERMINATE);
- Process::wait(slave);
+ process::post(slave, process::TERMINATE);
+ process::wait(slave);
- Process::post(master, TERMINATE);
- Process::wait(master);
+ process::post(master, process::TERMINATE);
+ process::wait(master);
- Process::filter(NULL);
+ process::filter(NULL);
- Clock::resume();
+ process::Clock::resume();
}
@@ -850,7 +854,7 @@ TEST(MasterTest, FrameworkMessage)
ASSERT_TRUE(GTEST_IS_THREADSAFE);
Master m;
- PID master = Process::spawn(&m);
+ PID<Master> master = process::spawn(&m);
Resources resources = Resources::parse("cpus:2;mem:1024");
@@ -884,7 +888,7 @@ TEST(MasterTest, FrameworkMessage)
TestingIsolationModule isolationModule(execs);
Slave s(resources, true, &isolationModule);
- PID slave = Process::spawn(&s);
+ PID<Slave> slave = process::spawn(&s);
BasicMasterDetector detector(master, slave, true);
@@ -967,11 +971,11 @@ TEST(MasterTest, FrameworkMessage)
schedDriver.stop();
schedDriver.join();
- Process::post(slave, TERMINATE);
- Process::wait(slave);
+ process::post(slave, process::TERMINATE);
+ process::wait(slave);
- Process::post(master, TERMINATE);
- Process::wait(master);
+ process::post(master, process::TERMINATE);
+ process::wait(master);
}
@@ -980,7 +984,7 @@ TEST(MasterTest, SchedulerFailoverFramew
ASSERT_TRUE(GTEST_IS_THREADSAFE);
Master m;
- PID master = Process::spawn(&m);
+ PID<Master> master = process::spawn(&m);
Resources resources = Resources::parse("cpus:2;mem:1024");
@@ -1006,7 +1010,7 @@ TEST(MasterTest, SchedulerFailoverFramew
TestingIsolationModule isolationModule(execs);
Slave s(resources, true, &isolationModule);
- PID slave = Process::spawn(&s);
+ PID<Slave> slave = process::spawn(&s);
BasicMasterDetector detector(master, slave, true);
@@ -1095,11 +1099,11 @@ TEST(MasterTest, SchedulerFailoverFramew
driver1.join();
driver2.join();
- Process::post(slave, TERMINATE);
- Process::wait(slave);
+ process::post(slave, process::TERMINATE);
+ process::wait(slave);
- Process::post(master, TERMINATE);
- Process::wait(master);
+ process::post(master, process::TERMINATE);
+ process::wait(master);
}
@@ -1108,7 +1112,7 @@ TEST(MasterTest, MultipleExecutors)
ASSERT_TRUE(GTEST_IS_THREADSAFE);
Master m;
- PID master = Process::spawn(&m);
+ PID<Master> master = process::spawn(&m);
Resources resources = Resources::parse("cpus:2;mem:1024");
@@ -1153,7 +1157,7 @@ TEST(MasterTest, MultipleExecutors)
TestingIsolationModule isolationModule(execs);
Slave s(resources, true, &isolationModule);
- PID slave = Process::spawn(&s);
+ PID<Slave> slave = process::spawn(&s);
BasicMasterDetector detector(master, slave, true);
@@ -1230,9 +1234,9 @@ TEST(MasterTest, MultipleExecutors)
driver.stop();
driver.join();
- Process::post(slave, TERMINATE);
- Process::wait(slave);
+ process::post(slave, process::TERMINATE);
+ process::wait(slave);
- Process::post(master, TERMINATE);
- Process::wait(master);
+ process::post(master, process::TERMINATE);
+ process::wait(master);
}
Modified: incubator/mesos/trunk/src/tests/offer_reply_errors_test.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/offer_reply_errors_test.cpp?rev=1132272&r1=1132271&r2=1132272&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/offer_reply_errors_test.cpp (original)
+++ incubator/mesos/trunk/src/tests/offer_reply_errors_test.cpp Sun Jun 5 09:15:51 2011
@@ -27,6 +27,8 @@ using mesos::internal::master::Master;
using mesos::internal::slave::Slave;
using mesos::internal::slave::Framework;
+using process::PID;
+
using std::string;
using std::map;
using std::vector;
@@ -95,7 +97,7 @@ TEST(MasterTest, DuplicateTaskIdsInRespo
ASSERT_TRUE(GTEST_IS_THREADSAFE);
DateUtils::setMockDate("200102030405");
- PID master = local::launch(1, 3, 3 * Gigabyte, false, false);
+ PID<Master> master = local::launch(1, 3, 3 * Gigabyte, false, false);
Resources resources;
@@ -144,7 +146,7 @@ TEST(MasterTest, TooMuchMemoryInTask)
ASSERT_TRUE(GTEST_IS_THREADSAFE);
DateUtils::setMockDate("200102030405");
- PID master = local::launch(1, 3, 3 * Gigabyte, false, false);
+ PID<Master> master = local::launch(1, 3, 3 * Gigabyte, false, false);
Resources resources;
@@ -188,7 +190,7 @@ TEST(MasterTest, TooMuchCpuInTask)
ASSERT_TRUE(GTEST_IS_THREADSAFE);
DateUtils::setMockDate("200102030405");
- PID master = local::launch(1, 3, 3 * Gigabyte, false, false);
+ PID<Master> master = local::launch(1, 3, 3 * Gigabyte, false, false);
Resources resources;
@@ -232,7 +234,7 @@ TEST(MasterTest, ZeroCpuInTask)
ASSERT_TRUE(GTEST_IS_THREADSAFE);
DateUtils::setMockDate("200102030405");
- PID master = local::launch(1, 3, 3 * Gigabyte, false, false);
+ PID<Master> master = local::launch(1, 3, 3 * Gigabyte, false, false);
Resources resources;
@@ -276,7 +278,7 @@ TEST(MasterTest, TooMuchMemoryAcrossTask
ASSERT_TRUE(GTEST_IS_THREADSAFE);
DateUtils::setMockDate("200102030405");
- PID master = local::launch(1, 3, 3 * Gigabyte, false, false);
+ PID<Master> master = local::launch(1, 3, 3 * Gigabyte, false, false);
Resources resources;
@@ -324,7 +326,7 @@ TEST(MasterTest, TooMuchCpuAcrossTasks)
ASSERT_TRUE(GTEST_IS_THREADSAFE);
DateUtils::setMockDate("200102030405");
- PID master = local::launch(1, 3, 3 * Gigabyte, false, false);
+ PID<Master> master = local::launch(1, 3, 3 * Gigabyte, false, false);
Resources resources;
Modified: incubator/mesos/trunk/src/tests/utils.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/utils.hpp?rev=1132272&r1=1132271&r2=1132272&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/utils.hpp (original)
+++ incubator/mesos/trunk/src/tests/utils.hpp Sun Jun 5 09:15:51 2011
@@ -99,10 +99,10 @@ public:
/**
* Definition of a mock Filter so that messages can act as triggers.
*/
-class MockFilter : public Filter
+class MockFilter : public process::Filter
{
public:
- MOCK_METHOD1(filter, bool(Message *));
+ MOCK_METHOD1(filter, bool(process::Message *));
};
@@ -113,8 +113,8 @@ public:
MATCHER_P3(MsgMatcher, name, from, to, "")
{
return (testing::Matcher<std::string>(name).Matches(arg->name) &&
- testing::Matcher<PID>(from).Matches(arg->from) &&
- testing::Matcher<PID>(to).Matches(arg->to));
+ testing::Matcher<process::UPID>(from).Matches(arg->from) &&
+ testing::Matcher<process::UPID>(to).Matches(arg->to));
}