You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 05:28:00 UTC

svn commit: r1131599 - in /incubator/mesos/trunk/src: ft_messaging.cpp ft_messaging.hpp master.cpp messages.hpp nexus_sched.cpp slave.cpp third_party/libprocess/tuple-impl.hpp

Author: benh
Date: Sun Jun  5 03:28:00 2011
New Revision: 1131599

URL: http://svn.apache.org/viewvc?rev=1131599&view=rev
Log:
SendFrameworkMessage is now reliable from framework to executors and vice versa. Libprocess modified to make pack public.

Modified:
    incubator/mesos/trunk/src/ft_messaging.cpp
    incubator/mesos/trunk/src/ft_messaging.hpp
    incubator/mesos/trunk/src/master.cpp
    incubator/mesos/trunk/src/messages.hpp
    incubator/mesos/trunk/src/nexus_sched.cpp
    incubator/mesos/trunk/src/slave.cpp
    incubator/mesos/trunk/src/third_party/libprocess/tuple-impl.hpp

Modified: incubator/mesos/trunk/src/ft_messaging.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/ft_messaging.cpp?rev=1131599&r1=1131598&r2=1131599&view=diff
==============================================================================
--- incubator/mesos/trunk/src/ft_messaging.cpp (original)
+++ incubator/mesos/trunk/src/ft_messaging.cpp Sun Jun  5 03:28:00 2011
@@ -75,7 +75,7 @@ void FTMessaging::sendOutstanding() {
     return;
   } 
 
-  foreachpair( const string &ftId, struct StoredMsg &msg, outMsgs) {
+  foreachpair( const string &ftId, struct FTStoredMsg &msg, outMsgs) {
     if (msg.count < FT_MAX_RESENDS) {
       DLOG(INFO) << "FT: RE-sending " << msg.ftId << " attempt:" << msg.count;
       Process::post(master, msg.id, msg.data.data(), msg.data.size());
@@ -117,6 +117,20 @@ bool FTMessaging::acceptMessage(string f
   }
 }
 
+bool FTMessaging::acceptMessageAck(string from, string ftId) {
+  bool res = acceptMessage(from, ftId);
+
+  if (!res)
+    LOG(WARNING) << "FT: asked called to ignore duplicate message " << ftId;
+  
+  DLOG(INFO) << "FT: Received message with id: " << ftId << " sending FT_RELAY_ACK";
+  
+  string msgStr = Tuple<EmptyClass>::tupleToString( Tuple<EmptyClass>::pack<FT_RELAY_ACK>(ftId, from) );
+  Process::post(master, FT_RELAY_ACK, msgStr.data(), msgStr.size()); 
+
+  return res;
+}
+
 void FTMessaging::setMasterPid(const PID &mPid) {
   master = mPid;
 }

Modified: incubator/mesos/trunk/src/ft_messaging.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/ft_messaging.hpp?rev=1131599&r1=1131598&r2=1131599&view=diff
==============================================================================
--- incubator/mesos/trunk/src/ft_messaging.hpp (original)
+++ incubator/mesos/trunk/src/ft_messaging.hpp Sun Jun  5 03:28:00 2011
@@ -62,12 +62,16 @@ using boost::unordered_set;
 class EmptyClass {
 };
 
-struct StoredMsg {
+/**
+ * Used in FTMessaging to store unacked messages, their count, ftId, libprocess id.
+ * @see FTMessaging
+ */
+struct FTStoredMsg {
 
-  StoredMsg(const string &_ftId, const string &_data, const MSGID &_id) : 
+  FTStoredMsg(const string &_ftId, const string &_data, const MSGID &_id) : 
     ftId(_ftId), data(_data), id(_id), count(0) {}
 
-  StoredMsg() : ftId(""), data(""), id(), count(0) {}
+  FTStoredMsg() : ftId(""), data(""), id(), count(0) {}
 
   string ftId;
   string data;
@@ -76,6 +80,10 @@ struct StoredMsg {
 };
 
 
+/**
+ * Singleton class that provides functionality for reliably sending messages, 
+ * resending them on timeout, acking received messages, and dropping duplicates.
+ */
 class FTMessaging {
 public:
   /**
@@ -107,7 +115,7 @@ public:
   {
     DLOG(INFO) << "FT: sending " << ftId;
     string msgStr = Tuple<EmptyClass>::tupleToString(msgTuple);
-    StoredMsg sm(ftId, msgStr, ID);
+    FTStoredMsg sm(ftId, msgStr, ID);
     outMsgs[ftId] = sm;
     if (!master) {
       DLOG(INFO) << "FT: Not RE-resending due to NULL master PID";
@@ -136,12 +144,22 @@ public:
   bool acceptMessage(string from, string ftId);
 
   /**
+   * Same as acceptMessage, but also sends an ACK back to the original sender if it returns true.
+   * @param from libprocess PID string representing the original sender of the message
+   * @param ftId the FT ID of the message
+   * @return true if message has not been received before and it is the next message expected to be received, false otherwise.
+   */
+  bool acceptMessageAck(string from, string ftId);
+
+  /**
    * @return a new unique FT ID for a message to be sent
    */ 
   string getNextId();
 
   /**
    * Sets the PID to the master (to be called when a new master comes up).
+   * Important invariant: needs to be called every time the master changes in slave/master/sched.
+   * @param mPid PID to the current master
    */
   void setMasterPid(const PID &mPid);
 
@@ -149,7 +167,7 @@ private:
 
   PID master;
 
-  unordered_map<string, StoredMsg> outMsgs;
+  unordered_map<string, FTStoredMsg> outMsgs;
 
   unordered_map<string, string> inMsgs;
 

Modified: incubator/mesos/trunk/src/master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master.cpp?rev=1131599&r1=1131598&r2=1131599&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master.cpp (original)
+++ incubator/mesos/trunk/src/master.cpp Sun Jun  5 03:28:00 2011
@@ -403,18 +403,21 @@ void Master::operator () ()
       break;
     }
 
-    case F2M_FRAMEWORK_MESSAGE: {
+    case F2M_FT_FRAMEWORK_MESSAGE: {
       FrameworkID fid;
       FrameworkMessage message;
-      unpack<F2M_FRAMEWORK_MESSAGE>(fid, message);
+      string ftId, senderStr;
+      unpack<F2M_FT_FRAMEWORK_MESSAGE>(ftId, senderStr, fid, message);
       Framework *framework = lookupFramework(fid);
       if (framework != NULL) {
 	Slave *slave = lookupSlave(message.slaveId);
 	if (slave != NULL) {
 	  LOG(INFO) << "Sending framework message to " << slave;
-	  send(slave->pid, pack<M2S_FRAMEWORK_MESSAGE>(fid, message));
-	}
-      }
+	  send(slave->pid, pack<M2S_FT_FRAMEWORK_MESSAGE>(ftId, senderStr, fid, message));
+        } else
+          DLOG(INFO) << "S2M_FT_FRAMEWORK_MESSAGE error: couldn't lookup framework id" << fid;
+      } else
+        DLOG(INFO) << "S2M_FT_FRAMEWORK_MESSAGE error: couldn't lookup slave id" << message.slaveId;
       break;
     }
 
@@ -467,13 +470,12 @@ void Master::operator () ()
     }
 
     case S2M_FT_STATUS_UPDATE: {
-      string ftId;
       SlaveID sid;
       FrameworkID fid;
       TaskID tid;
       TaskState state;
       string data;
-      string senderStr;
+      string ftId, senderStr;
 
       unpack<S2M_FT_STATUS_UPDATE>(ftId, senderStr, sid, fid, tid, state, data);
       DLOG(INFO) << "FT: prepare relay ftId:"<< ftId << " from: "<< senderStr;
@@ -485,19 +487,19 @@ void Master::operator () ()
           send(framework->pid, pack<M2F_FT_STATUS_UPDATE>(ftId, senderStr, tid, state, data));
 
           if (!ftMsg->acceptMessage(senderStr, ftId)) {
-            LOG(WARNING) << "Locally ignoring duplicate message with id:" << ftId;
-          } else {
-            // Update the task state locally
-            TaskInfo *task = slave->lookupTask(fid, tid);
-            if (task != NULL) {
-              LOG(INFO) << "Status update: " << task << " is in state " << state;
-              task->state = state;
-              // Remove the task if it finished or failed
-              if (state == TASK_FINISHED || state == TASK_FAILED ||
-                  state == TASK_KILLED || state == TASK_LOST) {
-                LOG(INFO) << "Removing " << task << " because it's done";
-                removeTask(task, TRR_TASK_ENDED);
-              }
+            LOG(WARNING) << "FT: Locally ignoring duplicate message with id:" << ftId;
+            break;
+          } 
+          // Update the task state locally
+          TaskInfo *task = slave->lookupTask(fid, tid);
+          if (task != NULL) {
+            LOG(INFO) << "Status update: " << task << " is in state " << state;
+            task->state = state;
+            // Remove the task if it finished or failed
+            if (state == TASK_FINISHED || state == TASK_FAILED ||
+                state == TASK_KILLED || state == TASK_LOST) {
+              LOG(INFO) << "Removing " << task << " because it's done";
+              removeTask(task, TRR_TASK_ENDED);
             }
           }
 	} else
@@ -532,11 +534,33 @@ void Master::operator () ()
 	    }
 	  }
 	} else
-          DLOG(INFO) << "S2M_STATUS_UPDATE error: couldn't lookup slave id" << sid;
-      }
+          DLOG(INFO) << "S2M_STATUS_UPDATE error: couldn't lookup framework id" << fid;
+      } else
+        DLOG(INFO) << "S2M_STATUS_UPDATE error: couldn't lookup slave id" << sid;
       break;
     }
       
+    case S2M_FT_FRAMEWORK_MESSAGE: {
+      SlaveID sid;
+      FrameworkID fid;
+      FrameworkMessage message; 
+      string ftId, senderStr;
+      unpack<S2M_FT_FRAMEWORK_MESSAGE>(ftId, senderStr, sid, fid, message);
+      Slave *slave = lookupSlave(sid);
+      if (slave != NULL) {
+	Framework *framework = lookupFramework(fid);
+	if (framework != NULL) {
+
+	  send(framework->pid, pack<M2F_FT_FRAMEWORK_MESSAGE>(ftId, senderStr, message));
+
+        } else
+          DLOG(INFO) << "S2M_FT_FRAMEWORK_MESSAGE error: couldn't lookup framework id" << fid;
+      } else
+        DLOG(INFO) << "S2M_FT_FRAMEWORK_MESSAGE error: couldn't lookup slave id" << sid;
+
+      break;
+    }
+
     case S2M_FRAMEWORK_MESSAGE: {
       SlaveID sid;
       FrameworkID fid;

Modified: incubator/mesos/trunk/src/messages.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messages.hpp?rev=1131599&r1=1131598&r2=1131599&view=diff
==============================================================================
--- incubator/mesos/trunk/src/messages.hpp (original)
+++ incubator/mesos/trunk/src/messages.hpp Sun Jun  5 03:28:00 2011
@@ -26,6 +26,7 @@ enum MessageType {
   F2M_REVIVE_OFFERS,
   F2M_KILL_TASK,
   F2M_FRAMEWORK_MESSAGE,
+  F2M_FT_FRAMEWORK_MESSAGE,
   
   /* From master to framework. */
   M2F_REGISTER_REPLY,
@@ -35,6 +36,7 @@ enum MessageType {
   M2F_FT_STATUS_UPDATE,
   M2F_LOST_SLAVE,
   M2F_FRAMEWORK_MESSAGE,
+  M2F_FT_FRAMEWORK_MESSAGE,
   M2F_ERROR,
   
   /* Used for FT. */
@@ -48,6 +50,7 @@ enum MessageType {
   S2M_STATUS_UPDATE,
   S2M_FT_STATUS_UPDATE,
   S2M_FRAMEWORK_MESSAGE,
+  S2M_FT_FRAMEWORK_MESSAGE,
   S2M_LOST_EXECUTOR,
 
   /* From slave heart to master. */
@@ -63,6 +66,7 @@ enum MessageType {
   M2S_KILL_TASK,
   M2S_KILL_FRAMEWORK,
   M2S_FRAMEWORK_MESSAGE,
+  M2S_FT_FRAMEWORK_MESSAGE,
   M2S_SHUTDOWN, // Used in unit tests to shut down cluster
 
   /* From executor to slave. */
@@ -143,6 +147,12 @@ TUPLE(F2M_FRAMEWORK_MESSAGE,
       (FrameworkID,
        FrameworkMessage));
 
+TUPLE(F2M_FT_FRAMEWORK_MESSAGE,
+      (std::string, /* FT ID */
+       std::string, /* original sender */
+       FrameworkID,
+       FrameworkMessage));
+
 TUPLE(M2F_REGISTER_REPLY,
       (FrameworkID));
 
@@ -171,6 +181,11 @@ TUPLE(M2F_LOST_SLAVE,
 TUPLE(M2F_FRAMEWORK_MESSAGE,
       (FrameworkMessage));
 
+TUPLE(M2F_FT_FRAMEWORK_MESSAGE,
+      (std::string, /* FT ID */
+       std::string, /* original sender */
+       FrameworkMessage));
+
 TUPLE(M2F_ERROR,
       (int32_t /*code*/,
        std::string /*msg*/));
@@ -224,6 +239,13 @@ TUPLE(S2M_FRAMEWORK_MESSAGE,
        FrameworkID,
        FrameworkMessage));
 
+TUPLE(S2M_FT_FRAMEWORK_MESSAGE,
+      (std::string, /* ftId */
+       std::string, /* sender PID */
+       SlaveID,
+       FrameworkID,
+       FrameworkMessage));
+
 TUPLE(S2M_LOST_EXECUTOR,
       (SlaveID,
        FrameworkID,
@@ -262,6 +284,12 @@ TUPLE(M2S_FRAMEWORK_MESSAGE,
       (FrameworkID,
        FrameworkMessage));
         
+TUPLE(M2S_FT_FRAMEWORK_MESSAGE,
+      (std::string, /* FT ID */
+       std::string, /* original sender */
+       FrameworkID,
+       FrameworkMessage));
+
 TUPLE(M2S_SHUTDOWN,
       ());
 

Modified: incubator/mesos/trunk/src/nexus_sched.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/nexus_sched.cpp?rev=1131599&r1=1131598&r2=1131599&view=diff
==============================================================================
--- incubator/mesos/trunk/src/nexus_sched.cpp (original)
+++ incubator/mesos/trunk/src/nexus_sched.cpp Sun Jun  5 03:28:00 2011
@@ -156,6 +156,7 @@ protected:
     }
 
     link(master);
+    ftMsg->setMasterPid(master);
     send(master, pack<F2M_REGISTER_FRAMEWORK>(frameworkName, user, execInfo));
 
     while(true) {
@@ -197,19 +198,14 @@ protected:
       }
 
       case M2F_FT_STATUS_UPDATE: {
-        string ftId;
-        string origPid;
         TaskID tid;
         TaskState state;
         string data;
+        string ftId, origPid;
         unpack<M2F_FT_STATUS_UPDATE>(ftId, origPid, tid, state, data);
-        if (!ftMsg->acceptMessage(origPid, ftId)) {
-          LOG(WARNING) << "Dropping duplicate message with id:" << ftId;
+        if (!ftMsg->acceptMessageAck(origPid, ftId))
           break;
-        }
-        DLOG(INFO) << "Received fault tolerant message with id: " << ftId;
-        DLOG(INFO) << "Sending FT_RELAY_ACK for : " << ftId;
-        send(from(), pack<FT_RELAY_ACK>(ftId, origPid));
+        DLOG(INFO) << "FT: Received message with id: " << ftId;
 
         TaskStatus status(tid, state, data);
         invoke(bind(&Scheduler::statusUpdate, sched, driver, ref(status)));
@@ -227,6 +223,20 @@ protected:
         break;
       }
 
+      case M2F_FT_FRAMEWORK_MESSAGE: {
+        FrameworkMessage msg;
+        string ftId, origPid;
+        unpack<M2F_FT_FRAMEWORK_MESSAGE>(ftId, origPid, msg);
+
+        if (!ftMsg->acceptMessageAck(origPid, ftId))
+          break;
+
+        DLOG(INFO) << "FT: Received message with id: " << ftId;
+
+        invoke(bind(&Scheduler::frameworkMessage, sched, driver, ref(msg)));
+        break;
+      }
+
       case M2F_FRAMEWORK_MESSAGE: {
         FrameworkMessage msg;
         unpack<M2F_FRAMEWORK_MESSAGE>(msg);
@@ -269,6 +279,7 @@ protected:
 	
 	LOG(INFO) << "Connecting to Nexus master at " << master;
 	link(master);
+        ftMsg->setMasterPid(master);
 	send(master, pack<F2M_REREGISTER_FRAMEWORK>(fid, frameworkName, user, execInfo));
 	break;
       }
@@ -502,8 +513,12 @@ void NexusSchedulerDriver::sendFramework
     return;
   }
 
-  process->send(process->master,
-                process->pack<F2M_FRAMEWORK_MESSAGE>(process->fid, message));
+  if (process->isFT) {
+    string ftId = process->ftMsg->getNextId();
+    process->ftMsg->reliableSend( ftId, process->pack<F2M_FT_FRAMEWORK_MESSAGE>(ftId, process->self(), process->fid, message));
+  } else
+    process->send(process->master,
+                  process->pack<F2M_FRAMEWORK_MESSAGE>(process->fid, message));
 }
 
 

Modified: incubator/mesos/trunk/src/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave.cpp?rev=1131599&r1=1131598&r2=1131599&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave.cpp Sun Jun  5 03:28:00 2011
@@ -254,6 +254,24 @@ void Slave::operator () ()
         break;
       }
 
+      case M2S_FT_FRAMEWORK_MESSAGE: {
+        string ftId, origPid;
+        unpack<M2S_FT_FRAMEWORK_MESSAGE>(ftId, origPid, fid, message);
+
+        if (!ftMsg->acceptMessageAck(origPid, ftId))
+          break;
+
+        DLOG(INFO) << "FT: Received message with id: " << ftId;
+
+        if (Executor *ex = getExecutor(fid)) {
+          send(ex->pid, pack<S2E_FRAMEWORK_MESSAGE>(message));
+        }
+        // TODO(matei): If executor is not started, queue framework message?
+        // (It's probably okay to just drop it since frameworks can have
+        // the executor send a message to the master to say when it's ready.)
+        break;
+      }
+
       case M2S_FRAMEWORK_MESSAGE: {
         unpack<M2S_FRAMEWORK_MESSAGE>(fid, message);
         if (Executor *ex = getExecutor(fid)) {
@@ -313,9 +331,11 @@ void Slave::operator () ()
           }
         }
         // Pass on the update to the master
-        string ftId = ftMsg->getNextId();
-        ftMsg->reliableSend(ftId, pack<S2M_FT_STATUS_UPDATE>(ftId, self(), id, fid, tid, taskState, data));
-          //        send(master, pack<S2M_STATUS_UPDATE>(id, fid, tid, taskState, data));
+        if (isFT) {
+          string ftId = ftMsg->getNextId();
+          ftMsg->reliableSend(ftId, pack<S2M_FT_STATUS_UPDATE>(ftId, self(), id, fid, tid, taskState, data));
+        } else
+          send(master, pack<S2M_STATUS_UPDATE>(id, fid, tid, taskState, data));
         break;
       }
 
@@ -323,7 +343,11 @@ void Slave::operator () ()
         unpack<E2S_FRAMEWORK_MESSAGE>(fid, message);
         // Set slave ID in case framework omitted it
         message.slaveId = this->id;
-        send(master, pack<S2M_FRAMEWORK_MESSAGE>(id, fid, message));
+        if (isFT) {
+          string ftId = ftMsg->getNextId();
+          ftMsg->reliableSend(ftId, pack<S2M_FT_FRAMEWORK_MESSAGE>(ftId, self(), id, fid, message));
+        } else
+          send(master, pack<S2M_FRAMEWORK_MESSAGE>(id, fid, message));
         break;
       }
 

Modified: incubator/mesos/trunk/src/third_party/libprocess/tuple-impl.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/third_party/libprocess/tuple-impl.hpp?rev=1131599&r1=1131598&r2=1131599&view=diff
==============================================================================
--- incubator/mesos/trunk/src/third_party/libprocess/tuple-impl.hpp (original)
+++ incubator/mesos/trunk/src/third_party/libprocess/tuple-impl.hpp Sun Jun  5 03:28:00 2011
@@ -51,7 +51,7 @@ at(const tuple<ID> &r)
 template <typename P>
 class Tuple : public P
 {
-protected:
+public:
   template <MSGID ID>
   static tuple<ID> pack()
   {
@@ -163,6 +163,8 @@ protected:
     return tuple<ID>(::boost::make_tuple(t0, t1, t2, t3, t4, t5, t6, t7, t8, t9));
   }
 
+protected:
+
   template <MSGID ID>
   void unpack(typename field<0, ID>::type &t0)
   {