You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 07:26:12 UTC

svn commit: r1131757 - in /incubator/mesos/trunk/src: ft_messaging.cpp leader_detector.hpp messages.hpp nexus_sched.cpp task_info.hpp url_processor.cpp

Author: benh
Date: Sun Jun  5 05:26:12 2011
New Revision: 1131757

URL: http://svn.apache.org/viewvc?rev=1131757&view=rev
Log:
more todos done

Removed:
    incubator/mesos/trunk/src/task_info.hpp
Modified:
    incubator/mesos/trunk/src/ft_messaging.cpp
    incubator/mesos/trunk/src/leader_detector.hpp
    incubator/mesos/trunk/src/messages.hpp
    incubator/mesos/trunk/src/nexus_sched.cpp
    incubator/mesos/trunk/src/url_processor.cpp

Modified: incubator/mesos/trunk/src/ft_messaging.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/ft_messaging.cpp?rev=1131757&r1=1131756&r2=1131757&view=diff
==============================================================================
--- incubator/mesos/trunk/src/ft_messaging.cpp (original)
+++ incubator/mesos/trunk/src/ft_messaging.cpp Sun Jun  5 05:26:12 2011
@@ -42,7 +42,7 @@ FTMessaging *FTMessaging::getInstance(co
 FTMessaging::FTMessaging(const PID &_master) : 
   master(_master), msgId(0)
 { 
-  srand( time(0) );
+  srand(time(0));
   char s[50];
   sprintf(s, "%09i", (int)rand());
   uniqPrefix = s;
@@ -81,7 +81,7 @@ void FTMessaging::sendOutstanding() {
     return;
   } 
 
-  foreachpair( const string &ftId, struct FTStoredMsg &msg, outMsgs) {
+  foreachpair(const string &ftId, struct FTStoredMsg &msg, outMsgs) {
     if (msg.callback != NULL) {
       DLOG(INFO) << "FT: calling timeout listener";
       msg.callback->timeout();
@@ -108,7 +108,7 @@ bool FTMessaging::acceptMessage(const st
     string oldSeq = inMsgs[from]; 
     string oldRnd = oldSeq;
     int pos;
-    if ( (pos = oldSeq.find_last_of(':')) != string::npos ) {  
+    if ((pos = oldSeq.find_last_of(':')) != string::npos) {  
       oldSeq.erase(0, pos + 1);
       oldRnd.erase(pos, 255);
       long seqNr = lexical_cast<long>(oldSeq);
@@ -138,7 +138,7 @@ bool FTMessaging::acceptMessageAckTo(con
     return res;
   }  
   
-  string msgStr = Tuple<EmptyClass>::tupleToString( Tuple<EmptyClass>::pack<FT_RELAY_ACK>(ftId, from) );
+  string msgStr = Tuple<EmptyClass>::tupleToString(Tuple<EmptyClass>::pack<FT_RELAY_ACK>(ftId, from));
   Process::post(to, FT_RELAY_ACK, msgStr.data(), msgStr.size()); 
   
   return res;

Modified: incubator/mesos/trunk/src/leader_detector.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/leader_detector.hpp?rev=1131757&r1=1131756&r2=1131757&view=diff
==============================================================================
--- incubator/mesos/trunk/src/leader_detector.hpp (original)
+++ incubator/mesos/trunk/src/leader_detector.hpp Sun Jun  5 05:26:12 2011
@@ -72,7 +72,7 @@ public:
    * @param quiet true makes ZK quiet, whereas false makes ZK output DEBUG messages
    */ 
   static void setQuiet(bool quiet) {
-    zoo_set_debug_level( quiet ? ZOO_LOG_LEVEL_ERROR : ZOO_LOG_LEVEL_DEBUG );
+    zoo_set_debug_level(quiet ? ZOO_LOG_LEVEL_ERROR : ZOO_LOG_LEVEL_DEBUG);
   }
 
 private: 
@@ -82,7 +82,7 @@ private: 
 
   void setMySeq(string seq) {  // converts "/nxmaster/000000131" to "000000131"
     int pos;
-    if ( (pos = seq.find_last_of('/')) != string::npos ) {  
+    if ((pos = seq.find_last_of('/')) != string::npos) {  
       mySeq = seq.erase(0,pos+1);
     } else
       mySeq = "";

Modified: incubator/mesos/trunk/src/messages.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messages.hpp?rev=1131757&r1=1131756&r2=1131757&view=diff
==============================================================================
--- incubator/mesos/trunk/src/messages.hpp (original)
+++ incubator/mesos/trunk/src/messages.hpp Sun Jun  5 05:26:12 2011
@@ -233,7 +233,7 @@ TUPLE(S2M_REREGISTER_SLAVE,
        std::string /*name*/,
        std::string /*publicDns*/,
        Resources,
-       std::vector<Task> ));
+       std::vector<Task>));
 
 TUPLE(S2M_UNREGISTER_SLAVE,
       (SlaveID));

Modified: incubator/mesos/trunk/src/nexus_sched.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/nexus_sched.cpp?rev=1131757&r1=1131756&r2=1131757&view=diff
==============================================================================
--- incubator/mesos/trunk/src/nexus_sched.cpp (original)
+++ incubator/mesos/trunk/src/nexus_sched.cpp Sun Jun  5 05:26:12 2011
@@ -91,8 +91,8 @@ private:
    virtual void timeout() {
       foreach (const TaskDescription &t, tasks) {
         DLOG(INFO) << "FT: faking M2F_STATUS_UPDATE due to timeout to server during ReplyToOffer";
-        parent->send( parent->self(), 
-                      pack<M2F_STATUS_UPDATE>(t.taskId, TASK_LOST, ""));
+        parent->send(parent->self(), 
+                     pack<M2F_STATUS_UPDATE>(t.taskId, TASK_LOST, ""));
       }
     }
 
@@ -269,7 +269,7 @@ protected:
         unpack<F2F_FRAMEWORK_MESSAGE>(msg);
 //         if (isFT) {
 //           string ftId = ftMsg->getNextId();
-//           ftMsg->reliableSend( ftId, pack<F2M_FT_FRAMEWORK_MESSAGE>(ftId, self(), fid, msg));
+//           ftMsg->reliableSend(ftId, pack<F2M_FT_FRAMEWORK_MESSAGE>(ftId, self(), fid, msg));
 //         } else
 //           send(master, pack<F2M_FRAMEWORK_MESSAGE>(fid, msg));
         send(savedSlavePids[msg.slaveId], pack<M2S_FRAMEWORK_MESSAGE>(fid, msg));
@@ -467,44 +467,53 @@ NexusSchedulerDriver::~NexusSchedulerDri
 }
 
 
-int NexusSchedulerDriver::start()
+void NexusSchedulerDriver::start()
 {
   Lock lock(&mutex);
 
   if (running) {
-    //error(1, "cannot call start - scheduler is already running");
-    return - 1;
+    error(1, "cannot call start - scheduler is already running");
+    return;
   }
 
+  PID pid;
+  
+  string initServer = master;
+
   if (master == string("localquiet")) {
     // TODO(benh): Look up resources in environment variables.
-    master = run_nexus(1, 1, 1073741824, true, true);
+    pid = run_nexus(1, 1, 1073741824, true, true);
+    ostringstream ss;
+    ss << pid;
+    initServer = ss.str();
   } else if (master == string("local")) {
     // TODO(benh): Look up resources in environment variables.
-    master = run_nexus(1, 1, 1073741824, true, false);
+    pid = run_nexus(1, 1, 1073741824, true, false);
+    ostringstream ss;
+    ss << pid;
+    initServer = ss.str();
   } 
 
   const string& frameworkName = sched->getFrameworkName(this);
   const ExecutorInfo& executorInfo = sched->getExecutorInfo(this);
 
-  process = new SchedulerProcess(master, this, sched, frameworkName, executorInfo);
+  process = new SchedulerProcess(initServer, this, sched, frameworkName, executorInfo);
   
   Process::spawn(process);
 
   running = true;
-
-  return 0;
 }
 
 
 
-int NexusSchedulerDriver::stop()
+void NexusSchedulerDriver::stop()
 {
   Lock lock(&mutex);
 
   if (!running) {
     // Don't issue an error (could lead to an infinite loop).
-    return -1;
+    // TODO(benh): It would be much cleaner to return success or failure!
+    return;
   }
 
   // TODO(benh): Do a Process::post instead?
@@ -516,111 +525,97 @@ int NexusSchedulerDriver::stop()
   running = false;
 
   pthread_cond_signal(&cond);
-
-  return 0;
 }
 
 
-int NexusSchedulerDriver::join()
+void NexusSchedulerDriver::join()
 {
   Lock lock(&mutex);
   while (running)
     pthread_cond_wait(&cond, &mutex);
-
-  return 0;
 }
 
 
-int NexusSchedulerDriver::run()
+void NexusSchedulerDriver::run()
 {
-  int ret = start();
-  return ret != 0 ? ret : join();
+  start();
+  join();
 }
 
 
-int NexusSchedulerDriver::killTask(TaskID tid)
+void NexusSchedulerDriver::killTask(TaskID tid)
 {
   Lock lock(&mutex);
 
   if (!running) {
-    //error(1, "cannot call killTask - scheduler is not running");
-    return -1;
+    error(1, "cannot call killTask - scheduler is not running");
+    return;
   }
 
   // TODO(benh): Do a Process::post instead?
 
   process->send(process->master,
                 process->pack<F2M_KILL_TASK>(process->fid, tid));
-
-  return 0;
 }
 
 
-int NexusSchedulerDriver::replyToOffer(OfferID offerId,
-				       const vector<TaskDescription> &tasks,
-				       const string_map &params)
+void NexusSchedulerDriver::replyToOffer(OfferID offerId,
+                                        const vector<TaskDescription> &tasks,
+                                        const string_map &params)
 {
   Lock lock(&mutex);
 
   if (!running) {
-    //error(1, "cannot call replyToOffer - scheduler is not running");
-    return -1;
+    error(1, "cannot call replyToOffer - scheduler is not running");
+    return;
   }
 
   // TODO(benh): Do a Process::post instead?
-  process->send( process->self(), process->pack<F2F_SLOT_OFFER_REPLY>(offerId, tasks, Params(params)));
-
-  return 0;
+  
+  process->send(process->self(), process->pack<F2F_SLOT_OFFER_REPLY>(offerId, tasks, Params(params)));
 }
 
 
-int NexusSchedulerDriver::reviveOffers()
+void NexusSchedulerDriver::reviveOffers()
 {
   Lock lock(&mutex);
 
   if (!running) {
-    //error(1, "cannot call reviveOffers - scheduler is not running");
-    return -1;
+    error(1, "cannot call reviveOffers - scheduler is not running");
+    return;
   }
 
   // TODO(benh): Do a Process::post instead?
 
   process->send(process->master,
                 process->pack<F2M_REVIVE_OFFERS>(process->fid));
-
-  return 0;
 }
 
 
-int NexusSchedulerDriver::sendFrameworkMessage(const FrameworkMessage &message)
+void NexusSchedulerDriver::sendFrameworkMessage(const FrameworkMessage &message)
 {
   Lock lock(&mutex);
 
   if (!running) {
-    //error(1, "cannot call sendFrameworkMessage - scheduler is not running");
-    return -1;
+    error(1, "cannot call sendFrameworkMessage - scheduler is not running");
+    return;
   }
 
-  // TODO(benh): Do a Process::post instead?
-
-  process->send( process->self(), process->pack<F2F_FRAMEWORK_MESSAGE>(message) );
-
-  return 0;
+  process->send(process->self(), process->pack<F2F_FRAMEWORK_MESSAGE>(message));
 }
 
 
-int NexusSchedulerDriver::sendHints(const string_map& hints)
+void NexusSchedulerDriver::sendHints(const string_map& hints)
 {
   Lock lock(&mutex);
 
   if (!running) {
-    //error(1, "cannot call sendHints - scheduler is not running");
-    return -1;
+    error(1, "cannot call sendHints - scheduler is not running");
+    return;
   }
 
   // TODO(*): Send the hints; for now, we do nothing
-  //error(1, "sendHints is not yet implemented");
-  return -1;
+  error(1, "sendHints is not yet implemented");
 }
 
 

Modified: incubator/mesos/trunk/src/url_processor.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/url_processor.cpp?rev=1131757&r1=1131756&r2=1131757&view=diff
==============================================================================
--- incubator/mesos/trunk/src/url_processor.cpp (original)
+++ incubator/mesos/trunk/src/url_processor.cpp Sun Jun  5 05:26:12 2011
@@ -43,7 +43,7 @@ pair<UrlProcessor::URLType, string> UrlP
     
   } else if (urlCap.find("ZOOFILE://") == 0) {
     
-    string zoos = parseZooFile( url.substr(10,1024) );
+    string zoos = parseZooFile(url.substr(10,1024));
     
     return pair<UrlProcessor::URLType, string>(UrlProcessor::ZOO, zoos);