You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 05:28:53 UTC

svn commit: r1131606 - in /incubator/mesos/trunk/src: ft_messaging.cpp ft_messaging.hpp master.cpp messages.hpp nexus_sched.cpp slave.cpp

Author: benh
Date: Sun Jun  5 03:28:52 2011
New Revision: 1131606

URL: http://svn.apache.org/viewvc?rev=1131606&view=rev
Log:
Merged two messages into one, removed a serious bug. Added more functionality to ft_messaging, which makes it complicated. But we have to live with the complexity until we can embed messages inside messages in libprocess.

Modified:
    incubator/mesos/trunk/src/ft_messaging.cpp
    incubator/mesos/trunk/src/ft_messaging.hpp
    incubator/mesos/trunk/src/master.cpp
    incubator/mesos/trunk/src/messages.hpp
    incubator/mesos/trunk/src/nexus_sched.cpp
    incubator/mesos/trunk/src/slave.cpp

Modified: incubator/mesos/trunk/src/ft_messaging.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/ft_messaging.cpp?rev=1131606&r1=1131605&r2=1131606&view=diff
==============================================================================
--- incubator/mesos/trunk/src/ft_messaging.cpp (original)
+++ incubator/mesos/trunk/src/ft_messaging.cpp Sun Jun  5 03:28:52 2011
@@ -100,7 +100,7 @@ void FTMessaging::sendOutstanding() {
 }
 
 // Careful: not idempotent function.
-bool FTMessaging::acceptMessage(string from, string ftId) {
+bool FTMessaging::acceptMessage(string ftId, string from) {
   if (inMsgs.find(from)==inMsgs.end()) {
     DLOG(INFO) << "FT: new msgs seq: " << ftId;
     inMsgs[from] = ftId;
@@ -129,22 +129,26 @@ bool FTMessaging::acceptMessage(string f
   }
 }
 
-bool FTMessaging::acceptMessageAck(string from, string ftId) {
-  DLOG(INFO) << "FT: Received message with id: " << ftId << " sending FT_RELAY_ACK";
-
+bool FTMessaging::acceptMessageAckTo(PID to, string ftId, string from) {
+  DLOG(INFO) << "FT: Received msg with id: " << ftId << " sending FT_RELAY_ACK to " << to;
+  
   bool res = acceptMessage(from, ftId);
-
+  
   if (!res) {
     LOG(WARNING) << "FT: asked caller to ignore duplicate message " << ftId;
     return res;
   }  
-
+  
   string msgStr = Tuple<EmptyClass>::tupleToString( Tuple<EmptyClass>::pack<FT_RELAY_ACK>(ftId, from) );
-  Process::post(master, FT_RELAY_ACK, msgStr.data(), msgStr.size()); 
-
+  Process::post(to, FT_RELAY_ACK, msgStr.data(), msgStr.size()); 
+  
   return res;
 }
 
+bool FTMessaging::acceptMessageAck(string ftId, string from) {
+  return acceptMessageAckTo(master, ftId, from);
+}
+
 void FTMessaging::setMasterPid(const PID &mPid) {
   master = mPid;
 }

Modified: incubator/mesos/trunk/src/ft_messaging.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/ft_messaging.hpp?rev=1131606&r1=1131605&r2=1131606&view=diff
==============================================================================
--- incubator/mesos/trunk/src/ft_messaging.hpp (original)
+++ incubator/mesos/trunk/src/ft_messaging.hpp Sun Jun  5 03:28:52 2011
@@ -152,19 +152,28 @@ public:
 
   /**
    * Checks if a message with FT ID from a node has already been received previously. 
-   * @param from libprocess PID string representing the original sender of the message
    * @param ftId the FT ID of the message
+   * @param from libprocess PID string representing the original sender of the message
    * @return true if message has not been received before and it is the next message expected to be received, false otherwise.
    */
-  bool acceptMessage(string from, string ftId);
+  bool acceptMessage(string ftId, string from);
 
   /**
    * Same as acceptMessage, but also sends an ACK back to the original sender if it returns true.
+   * @param ftId the FT ID of the message
    * @param from libprocess PID string representing the original sender of the message
+   * @return true if message has not been received before and it is the next message expected to be received, false otherwise.
+   */
+  bool acceptMessageAck(string ftId, string from);
+
+  /**
+   * Same as acceptMessageAck, but explicitly specifies the pid of the node that should receive the ack.
+   * @param to PID string of the node to receive the ack.
    * @param ftId the FT ID of the message
+   * @param from libprocess PID string representing the original sender of the message
    * @return true if message has not been received before and it is the next message expected to be received, false otherwise.
    */
-  bool acceptMessageAck(string from, string ftId);
+  bool acceptMessageAckTo(PID to, string ftId, string from);
 
   /**
    * @return a new unique FT ID for a message to be sent

Modified: incubator/mesos/trunk/src/master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master.cpp?rev=1131606&r1=1131605&r2=1131606&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master.cpp (original)
+++ incubator/mesos/trunk/src/master.cpp Sun Jun  5 03:28:52 2011
@@ -360,7 +360,10 @@ void Master::operator () ()
       Params params;
       string ftId, senderStr;
       unpack<F2M_FT_SLOT_OFFER_REPLY>(ftId, senderStr, fid, oid, tasks, params);
-      if (!ftMsg->acceptMessageAck(senderStr, ftId)) {
+      PID senderPid;
+      istringstream ss(senderStr);
+      ss >> senderPid;
+      if (!ftMsg->acceptMessageAckTo(senderPid, ftId, senderStr)) {
         LOG(WARNING) << "FT: Locally ignoring duplicate message with id:" << ftId;
         break;
       } 
@@ -658,7 +661,7 @@ void Master::operator () ()
       string origPidStr;
       unpack<FT_RELAY_ACK>(ftId, origPidStr);
       
-      DLOG(INFO) << "FT_RELAY_ACK for " << ftId << " FT_ACK sent to " << origPidStr;
+      DLOG(INFO) << "FT_RELAY_ACK for " << ftId << " forwarding it to " << origPidStr;
             
       PID origPid;
       istringstream iss(origPidStr);
@@ -667,7 +670,7 @@ void Master::operator () ()
         break;
       }
 
-      send(origPid, pack<FT_ACK>(ftId));
+      send(origPid, pack<FT_RELAY_ACK>(ftId, origPidStr));
 
       break;
     }

Modified: incubator/mesos/trunk/src/messages.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messages.hpp?rev=1131606&r1=1131605&r2=1131606&view=diff
==============================================================================
--- incubator/mesos/trunk/src/messages.hpp (original)
+++ incubator/mesos/trunk/src/messages.hpp Sun Jun  5 03:28:52 2011
@@ -200,18 +200,11 @@ TUPLE(M2F_ERROR,
        std::string /*msg*/));
 
 
-
 TUPLE(FT_RELAY_ACK,
       (std::string, /* FT ID */
        std::string /* PID of orig */
        ));
 
-TUPLE(FT_ACK,
-      (std::string /* FT ID */
-       ));
-
-
-
 TUPLE(S2M_REGISTER_SLAVE,
       (std::string /*name*/,
        std::string /*publicDns*/,

Modified: incubator/mesos/trunk/src/nexus_sched.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/nexus_sched.cpp?rev=1131606&r1=1131605&r2=1131606&view=diff
==============================================================================
--- incubator/mesos/trunk/src/nexus_sched.cpp (original)
+++ incubator/mesos/trunk/src/nexus_sched.cpp Sun Jun  5 03:28:52 2011
@@ -228,7 +228,7 @@ protected:
         string data;
         string ftId, origPid;
         unpack<M2F_FT_STATUS_UPDATE>(ftId, origPid, tid, state, data);
-        if (!ftMsg->acceptMessageAck(origPid, ftId))
+        if (!ftMsg->acceptMessageAck(ftId, origPid))
           break;
         DLOG(INFO) << "FT: Received message with id: " << ftId;
 
@@ -254,7 +254,7 @@ protected:
         string ftId, origPid;
         unpack<M2F_FT_FRAMEWORK_MESSAGE>(ftId, origPid, msg);
 
-        if (!ftMsg->acceptMessageAck(origPid, ftId))
+        if (!ftMsg->acceptMessageAck(ftId, origPid))
           break;
 
         DLOG(INFO) << "FT: Received message with id: " << ftId;
@@ -288,8 +288,13 @@ protected:
 
       case PROCESS_EXIT: {
         const char* message = "Connection to master failed";
-	LOG(INFO) << message;
-	//        invoke(bind(&Scheduler::error, sched, driver, -1, message));
+	 if (isFT) 
+	   LOG(WARNING) << "Connection to master failed. Waiting for a new master to be elected.";
+	 else 
+	   {
+	      LOG(ERROR) << "Connection to master failed. Exiting. Consider running Nexus in FT mode!";
+	      invoke(bind(&Scheduler::error, sched, driver, -1, message));
+	   }
         break;
       }
 
@@ -309,10 +314,22 @@ protected:
 	send(master, pack<F2M_REREGISTER_FRAMEWORK>(fid, frameworkName, user, execInfo));
 	break;
       }
+
+      case FT_RELAY_ACK: {
+        string ftId, senderStr;
+        unpack<FT_RELAY_ACK>(ftId, senderStr);
+
+        DLOG(INFO) << "FT: got final ack for " << ftId;
+
+        ftMsg->gotAck(ftId);
+        break;
+      }
+
       case PROCESS_TIMEOUT: {
         ftMsg->sendOutstanding();
 	break;
       }
+
       default: {
         ostringstream oss;
         oss << "SchedulerProcess received unknown message " << msgid()
@@ -515,6 +532,7 @@ void NexusSchedulerDriver::replyToOffer(
       new SchedulerProcess::TimeoutListener(process, tasks);
 
     string ftId = process->ftMsg->getNextId();
+    DLOG(INFO) << "Sending reliably reply to slot offer for msg " << ftId;
     process->ftMsg->reliableSend( ftId,
                                   process->pack<F2M_FT_SLOT_OFFER_REPLY>(ftId,
                                                                          process->self(),

Modified: incubator/mesos/trunk/src/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave.cpp?rev=1131606&r1=1131605&r2=1131606&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave.cpp Sun Jun  5 03:28:52 2011
@@ -258,7 +258,7 @@ void Slave::operator () ()
         string ftId, origPid;
         unpack<M2S_FT_FRAMEWORK_MESSAGE>(ftId, origPid, fid, message);
 
-        if (!ftMsg->acceptMessageAck(origPid, ftId))
+        if (!ftMsg->acceptMessageAck(ftId, origPid))
           break;
 
         DLOG(INFO) << "FT: Received message with id: " << ftId;
@@ -361,12 +361,16 @@ void Slave::operator () ()
 
         if (from() == master) {
 	  // TODO: Fault tolerance!
-          LOG(ERROR) << "Master disconnected! Committing suicide ...";
-	  // TODO(matei): Add support for factory style destroy of objects!
-	  // if (isolationModule != NULL)
-	  //   delete isolationModule;
-	  // // TODO: Shut down executors?
-	  // return;
+	   if (isFT)
+	     LOG(WARNING) << "FT: Master disconnected! Waiting for a new master to be elected."; 
+	   else 
+	     {
+		LOG(ERROR) << "Master disconnected! Exiting. Consider running Nexus in FT mode!";
+		if (isolationModule != NULL)
+		  delete isolationModule;
+		// TODO: Shut down executors?
+		return;
+	     }
 	}
 
         foreachpair (_, Executor *ex, executors) {
@@ -436,9 +440,9 @@ void Slave::operator () ()
 	break;
       }
     
-      case FT_ACK: {
-        string ftId;
-        unpack<FT_ACK>(ftId);
+      case FT_RELAY_ACK: {
+        string ftId, senderStr;
+        unpack<FT_RELAY_ACK>(ftId, senderStr);
 
         DLOG(INFO) << "FT: got final ack for " << ftId;