You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 07:26:12 UTC
svn commit: r1131757 - in /incubator/mesos/trunk/src: ft_messaging.cpp
leader_detector.hpp messages.hpp nexus_sched.cpp task_info.hpp
url_processor.cpp
Author: benh
Date: Sun Jun 5 05:26:12 2011
New Revision: 1131757
URL: http://svn.apache.org/viewvc?rev=1131757&view=rev
Log:
more todos done
Removed:
incubator/mesos/trunk/src/task_info.hpp
Modified:
incubator/mesos/trunk/src/ft_messaging.cpp
incubator/mesos/trunk/src/leader_detector.hpp
incubator/mesos/trunk/src/messages.hpp
incubator/mesos/trunk/src/nexus_sched.cpp
incubator/mesos/trunk/src/url_processor.cpp
Modified: incubator/mesos/trunk/src/ft_messaging.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/ft_messaging.cpp?rev=1131757&r1=1131756&r2=1131757&view=diff
==============================================================================
--- incubator/mesos/trunk/src/ft_messaging.cpp (original)
+++ incubator/mesos/trunk/src/ft_messaging.cpp Sun Jun 5 05:26:12 2011
@@ -42,7 +42,7 @@ FTMessaging *FTMessaging::getInstance(co
FTMessaging::FTMessaging(const PID &_master) :
master(_master), msgId(0)
{
- srand( time(0) );
+ srand(time(0));
char s[50];
sprintf(s, "%09i", (int)rand());
uniqPrefix = s;
@@ -81,7 +81,7 @@ void FTMessaging::sendOutstanding() {
return;
}
- foreachpair( const string &ftId, struct FTStoredMsg &msg, outMsgs) {
+ foreachpair(const string &ftId, struct FTStoredMsg &msg, outMsgs) {
if (msg.callback != NULL) {
DLOG(INFO) << "FT: calling timeout listener";
msg.callback->timeout();
@@ -108,7 +108,7 @@ bool FTMessaging::acceptMessage(const st
string oldSeq = inMsgs[from];
string oldRnd = oldSeq;
int pos;
- if ( (pos = oldSeq.find_last_of(':')) != string::npos ) {
+ if ((pos = oldSeq.find_last_of(':')) != string::npos) {
oldSeq.erase(0, pos + 1);
oldRnd.erase(pos, 255);
long seqNr = lexical_cast<long>(oldSeq);
@@ -138,7 +138,7 @@ bool FTMessaging::acceptMessageAckTo(con
return res;
}
- string msgStr = Tuple<EmptyClass>::tupleToString( Tuple<EmptyClass>::pack<FT_RELAY_ACK>(ftId, from) );
+ string msgStr = Tuple<EmptyClass>::tupleToString(Tuple<EmptyClass>::pack<FT_RELAY_ACK>(ftId, from));
Process::post(to, FT_RELAY_ACK, msgStr.data(), msgStr.size());
return res;
Modified: incubator/mesos/trunk/src/leader_detector.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/leader_detector.hpp?rev=1131757&r1=1131756&r2=1131757&view=diff
==============================================================================
--- incubator/mesos/trunk/src/leader_detector.hpp (original)
+++ incubator/mesos/trunk/src/leader_detector.hpp Sun Jun 5 05:26:12 2011
@@ -72,7 +72,7 @@ public:
* @param quiet true makes ZK quiet, whereas false makes ZK output DEBUG messages
*/
static void setQuiet(bool quiet) {
- zoo_set_debug_level( quiet ? ZOO_LOG_LEVEL_ERROR : ZOO_LOG_LEVEL_DEBUG );
+ zoo_set_debug_level(quiet ? ZOO_LOG_LEVEL_ERROR : ZOO_LOG_LEVEL_DEBUG);
}
private:
@@ -82,7 +82,7 @@ private:
void setMySeq(string seq) { // converts "/nxmaster/000000131" to "000000131"
int pos;
- if ( (pos = seq.find_last_of('/')) != string::npos ) {
+ if ((pos = seq.find_last_of('/')) != string::npos) {
mySeq = seq.erase(0,pos+1);
} else
mySeq = "";
Modified: incubator/mesos/trunk/src/messages.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messages.hpp?rev=1131757&r1=1131756&r2=1131757&view=diff
==============================================================================
--- incubator/mesos/trunk/src/messages.hpp (original)
+++ incubator/mesos/trunk/src/messages.hpp Sun Jun 5 05:26:12 2011
@@ -233,7 +233,7 @@ TUPLE(S2M_REREGISTER_SLAVE,
std::string /*name*/,
std::string /*publicDns*/,
Resources,
- std::vector<Task> ));
+ std::vector<Task>));
TUPLE(S2M_UNREGISTER_SLAVE,
(SlaveID));
Modified: incubator/mesos/trunk/src/nexus_sched.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/nexus_sched.cpp?rev=1131757&r1=1131756&r2=1131757&view=diff
==============================================================================
--- incubator/mesos/trunk/src/nexus_sched.cpp (original)
+++ incubator/mesos/trunk/src/nexus_sched.cpp Sun Jun 5 05:26:12 2011
@@ -91,8 +91,8 @@ private:
virtual void timeout() {
foreach (const TaskDescription &t, tasks) {
DLOG(INFO) << "FT: faking M2F_STATUS_UPDATE due to timeout to server during ReplyToOffer";
- parent->send( parent->self(),
- pack<M2F_STATUS_UPDATE>(t.taskId, TASK_LOST, ""));
+ parent->send(parent->self(),
+ pack<M2F_STATUS_UPDATE>(t.taskId, TASK_LOST, ""));
}
}
@@ -269,7 +269,7 @@ protected:
unpack<F2F_FRAMEWORK_MESSAGE>(msg);
// if (isFT) {
// string ftId = ftMsg->getNextId();
-// ftMsg->reliableSend( ftId, pack<F2M_FT_FRAMEWORK_MESSAGE>(ftId, self(), fid, msg));
+// ftMsg->reliableSend(ftId, pack<F2M_FT_FRAMEWORK_MESSAGE>(ftId, self(), fid, msg));
// } else
// send(master, pack<F2M_FRAMEWORK_MESSAGE>(fid, msg));
send(savedSlavePids[msg.slaveId], pack<M2S_FRAMEWORK_MESSAGE>(fid, msg));
@@ -467,44 +467,53 @@ NexusSchedulerDriver::~NexusSchedulerDri
}
-int NexusSchedulerDriver::start()
+void NexusSchedulerDriver::start()
{
Lock lock(&mutex);
if (running) {
- //error(1, "cannot call start - scheduler is already running");
- return - 1;
+ error(1, "cannot call start - scheduler is already running");
+ return;
}
+ PID pid;
+
+ string initServer = master;
+
if (master == string("localquiet")) {
// TODO(benh): Look up resources in environment variables.
- master = run_nexus(1, 1, 1073741824, true, true);
+ pid = run_nexus(1, 1, 1073741824, true, true);
+ ostringstream ss;
+ ss << pid;
+ initServer = ss.str();
} else if (master == string("local")) {
// TODO(benh): Look up resources in environment variables.
- master = run_nexus(1, 1, 1073741824, true, false);
+ pid = run_nexus(1, 1, 1073741824, true, false);
+ ostringstream ss;
+ ss << pid;
+ initServer = ss.str();
}
const string& frameworkName = sched->getFrameworkName(this);
const ExecutorInfo& executorInfo = sched->getExecutorInfo(this);
- process = new SchedulerProcess(master, this, sched, frameworkName, executorInfo);
+ process = new SchedulerProcess(initServer, this, sched, frameworkName, executorInfo);
Process::spawn(process);
running = true;
-
- return 0;
}
-int NexusSchedulerDriver::stop()
+void NexusSchedulerDriver::stop()
{
Lock lock(&mutex);
if (!running) {
// Don't issue an error (could lead to an infinite loop).
- return -1;
+ // TODO(benh): It would be much cleaner to return success or failure!
+ return;
}
// TODO(benh): Do a Process::post instead?
@@ -516,111 +525,97 @@ int NexusSchedulerDriver::stop()
running = false;
pthread_cond_signal(&cond);
-
- return 0;
}
-int NexusSchedulerDriver::join()
+void NexusSchedulerDriver::join()
{
Lock lock(&mutex);
while (running)
pthread_cond_wait(&cond, &mutex);
-
- return 0;
}
-int NexusSchedulerDriver::run()
+void NexusSchedulerDriver::run()
{
- int ret = start();
- return ret != 0 ? ret : join();
+ start();
+ join();
}
-int NexusSchedulerDriver::killTask(TaskID tid)
+void NexusSchedulerDriver::killTask(TaskID tid)
{
Lock lock(&mutex);
if (!running) {
- //error(1, "cannot call killTask - scheduler is not running");
- return -1;
+ error(1, "cannot call killTask - scheduler is not running");
+ return;
}
// TODO(benh): Do a Process::post instead?
process->send(process->master,
process->pack<F2M_KILL_TASK>(process->fid, tid));
-
- return 0;
}
-int NexusSchedulerDriver::replyToOffer(OfferID offerId,
- const vector<TaskDescription> &tasks,
- const string_map ¶ms)
+void NexusSchedulerDriver::replyToOffer(OfferID offerId,
+ const vector<TaskDescription> &tasks,
+ const string_map ¶ms)
{
Lock lock(&mutex);
if (!running) {
- //error(1, "cannot call replyToOffer - scheduler is not running");
- return -1;
+ error(1, "cannot call replyToOffer - scheduler is not running");
+ return;
}
// TODO(benh): Do a Process::post instead?
- process->send( process->self(), process->pack<F2F_SLOT_OFFER_REPLY>(offerId, tasks, Params(params)));
-
- return 0;
+
+ process->send(process->self(), process->pack<F2F_SLOT_OFFER_REPLY>(offerId, tasks, Params(params)));
}
-int NexusSchedulerDriver::reviveOffers()
+void NexusSchedulerDriver::reviveOffers()
{
Lock lock(&mutex);
if (!running) {
- //error(1, "cannot call reviveOffers - scheduler is not running");
- return -1;
+ error(1, "cannot call reviveOffers - scheduler is not running");
+ return;
}
// TODO(benh): Do a Process::post instead?
process->send(process->master,
process->pack<F2M_REVIVE_OFFERS>(process->fid));
-
- return 0;
}
-int NexusSchedulerDriver::sendFrameworkMessage(const FrameworkMessage &message)
+void NexusSchedulerDriver::sendFrameworkMessage(const FrameworkMessage &message)
{
Lock lock(&mutex);
if (!running) {
- //error(1, "cannot call sendFrameworkMessage - scheduler is not running");
- return -1;
+ error(1, "cannot call sendFrameworkMessage - scheduler is not running");
+ return;
}
- // TODO(benh): Do a Process::post instead?
-
- process->send( process->self(), process->pack<F2F_FRAMEWORK_MESSAGE>(message) );
-
- return 0;
+ process->send(process->self(), process->pack<F2F_FRAMEWORK_MESSAGE>(message));
}
-int NexusSchedulerDriver::sendHints(const string_map& hints)
+void NexusSchedulerDriver::sendHints(const string_map& hints)
{
Lock lock(&mutex);
if (!running) {
- //error(1, "cannot call sendHints - scheduler is not running");
- return -1;
+ error(1, "cannot call sendHints - scheduler is not running");
+ return;
}
// TODO(*): Send the hints; for now, we do nothing
- //error(1, "sendHints is not yet implemented");
- return -1;
+ error(1, "sendHints is not yet implemented");
}
Modified: incubator/mesos/trunk/src/url_processor.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/url_processor.cpp?rev=1131757&r1=1131756&r2=1131757&view=diff
==============================================================================
--- incubator/mesos/trunk/src/url_processor.cpp (original)
+++ incubator/mesos/trunk/src/url_processor.cpp Sun Jun 5 05:26:12 2011
@@ -43,7 +43,7 @@ pair<UrlProcessor::URLType, string> UrlP
} else if (urlCap.find("ZOOFILE://") == 0) {
- string zoos = parseZooFile( url.substr(10,1024) );
+ string zoos = parseZooFile(url.substr(10,1024));
return pair<UrlProcessor::URLType, string>(UrlProcessor::ZOO, zoos);