You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 07:26:48 UTC

svn commit: r1131762 - in /incubator/mesos/trunk/src: ft_messaging.cpp ft_messaging.hpp master.cpp messages.hpp nexus_sched.cpp

Author: benh
Date: Sun Jun  5 05:26:48 2011
New Revision: 1131762

URL: http://svn.apache.org/viewvc?rev=1131762&view=rev
Log:
Redesigned to handle timeouts for ReplyToOffers. Needs to be tested still.

Modified:
    incubator/mesos/trunk/src/ft_messaging.cpp
    incubator/mesos/trunk/src/ft_messaging.hpp
    incubator/mesos/trunk/src/master.cpp
    incubator/mesos/trunk/src/messages.hpp
    incubator/mesos/trunk/src/nexus_sched.cpp

Modified: incubator/mesos/trunk/src/ft_messaging.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/ft_messaging.cpp?rev=1131762&r1=1131761&r2=1131762&view=diff
==============================================================================
--- incubator/mesos/trunk/src/ft_messaging.cpp (original)
+++ incubator/mesos/trunk/src/ft_messaging.cpp Sun Jun  5 05:26:48 2011
@@ -70,8 +70,6 @@ void FTMessaging::gotAck(const string &f
 
 void FTMessaging::deleteMessage(const string &ftId) {
   struct FTStoredMsg &msg = outMsgs[ftId];
-  if (msg.callback != NULL)
-    delete msg.callback;     // ugly and sad. shared_ptr would have been better
   outMsgs.erase(ftId);
 }
 
@@ -82,11 +80,7 @@ void FTMessaging::sendOutstanding() {
   } 
 
   foreachpair(const string &ftId, struct FTStoredMsg &msg, outMsgs) {
-    if (msg.callback != NULL) {
-      DLOG(INFO) << "FT: calling timeout listener";
-      msg.callback->timeout();
-      deleteMessage(ftId);
-    } else if (msg.count < FT_MAX_RESENDS) {
+    if (msg.count < FT_MAX_RESENDS) {
       DLOG(INFO) << "FT: RE-sending " << msg.ftId << " attempt:" << msg.count;
       Process::post(master, msg.id, msg.data.data(), msg.data.size());
       msg.count++;

Modified: incubator/mesos/trunk/src/ft_messaging.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/ft_messaging.hpp?rev=1131762&r1=1131761&r2=1131762&view=diff
==============================================================================
--- incubator/mesos/trunk/src/ft_messaging.hpp (original)
+++ incubator/mesos/trunk/src/ft_messaging.hpp Sun Jun  5 05:26:48 2011
@@ -29,33 +29,21 @@ class EmptyClass {
 
 
 /**
- * Interface used to signal that a message has timed out. 
- */
-class FTCallback {
-public:
-  /**
-   * Called from sendOutstanding() if a message has a callback object and it has timed out.
-   */
-  virtual void timeout() = 0;
-};
-
-/**
  * Used in FTMessaging to store unacked messages, their count, ftId, libprocess id.
  * @see FTMessaging
  */
 struct FTStoredMsg {
 
-  FTStoredMsg(const string &_ftId, const string &_data, const MSGID &_id, FTCallback *_cb = NULL) : 
-    ftId(_ftId), data(_data), id(_id), count(1), ts(time(0)), callback(_cb) {}
+  FTStoredMsg(const string &_ftId, const string &_data, const MSGID &_id) : 
+    ftId(_ftId), data(_data), id(_id), count(1), ts(time(0)) {}
 
-  FTStoredMsg(bool _cb = false) : ftId(""), data(""), id(), count(1), ts(time(0)), callback(NULL) {}
+  FTStoredMsg(bool _cb = false) : ftId(""), data(""), id(), count(1), ts(time(0)) {}
 
   string ftId;
   string data;
   MSGID id;
   long count;
   time_t ts;   // not currently used
-  FTCallback *callback;
 };
 
 /**
@@ -88,14 +76,13 @@ public:
    * @see getNextId().
    * @param ftId string representing the unique FT id of the message
    * @param msgTuple libprocess tuple<ID> 
-   * @param FTCallback if not null, then FTCallback will be called by sendOutstanding()
    */
-  template<MSGID ID> void reliableSend(const string &ftId, const tuple<ID> &msgTuple, FTCallback *callback = NULL)
+  template<MSGID ID> void reliableSend(const string &ftId, const tuple<ID> &msgTuple)
   {
     DLOG(INFO) << "FT: sending " << ftId;
     string msgStr = Tuple<EmptyClass>::tupleToString(msgTuple);
 
-    outMsgs[ftId] = FTStoredMsg(ftId, msgStr, ID, callback);
+    outMsgs[ftId] = FTStoredMsg(ftId, msgStr, ID);
     if (!master) {
       DLOG(INFO) << "FT: Not RE-resending due to NULL master PID";
       return;

Modified: incubator/mesos/trunk/src/master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master.cpp?rev=1131762&r1=1131761&r2=1131762&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master.cpp (original)
+++ incubator/mesos/trunk/src/master.cpp Sun Jun  5 05:26:48 2011
@@ -395,39 +395,6 @@ void Master::operator () ()
       break;
     }
 
-    case F2M_FT_SLOT_OFFER_REPLY: {
-      FrameworkID fid;
-      OfferID oid;
-      vector<TaskDescription> tasks;
-      Params params;
-      string ftId, senderStr;
-      unpack<F2M_FT_SLOT_OFFER_REPLY>(ftId, senderStr, fid, oid, tasks, params);
-      PID senderPid = make_pid(senderStr.c_str());
-      if (!senderPid) {
-        LOG(ERROR) << "Couldn't create PID out of sender string";
-      }
-      if (!ftMsg->acceptMessageAckTo(senderPid, ftId, senderStr)) {
-        LOG(WARNING) << "FT: Locally ignoring duplicate message with id:" << ftId;
-        break;
-      } 
-      Framework *framework = lookupFramework(fid);
-      if (framework != NULL) {
-	SlotOffer *offer = lookupSlotOffer(oid);
-	if (offer != NULL) {
-	  processOfferReply(offer, tasks, params);
-	} else {
-	  // The slot offer is gone, meaning that we rescinded it or that
-	  // the slave was lost; immediately report any tasks in it as lost
-	  foreach (TaskDescription &t, tasks) {
-	    send(framework->pid,
-		 pack<M2F_STATUS_UPDATE>(t.taskId, TASK_LOST, ""));
-	  }
-	}
-      } else
-        DLOG(INFO) << "F2M_FT_SLOT_OFFER_REPLY error: couldn't lookup framework id" << fid;
-      break;
-    }
-
     case F2M_SLOT_OFFER_REPLY: {
       FrameworkID fid;
       OfferID oid;

Modified: incubator/mesos/trunk/src/messages.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messages.hpp?rev=1131762&r1=1131761&r2=1131762&view=diff
==============================================================================
--- incubator/mesos/trunk/src/messages.hpp (original)
+++ incubator/mesos/trunk/src/messages.hpp Sun Jun  5 05:26:48 2011
@@ -24,7 +24,6 @@ enum MessageType {
   F2M_REREGISTER_FRAMEWORK,
   F2M_UNREGISTER_FRAMEWORK,
   F2M_SLOT_OFFER_REPLY,
-  F2M_FT_SLOT_OFFER_REPLY,
   F2M_REVIVE_OFFERS,
   F2M_KILL_TASK,
   F2M_FRAMEWORK_MESSAGE,
@@ -32,6 +31,7 @@ enum MessageType {
 
   F2F_SLOT_OFFER_REPLY,
   F2F_FRAMEWORK_MESSAGE,
+  F2F_TASK_RUNNING_STATUS,
   
   /* From master to framework. */
   M2F_REGISTER_REPLY,
@@ -144,14 +144,6 @@ TUPLE(F2M_SLOT_OFFER_REPLY,
        std::vector<TaskDescription>,
        Params));
 
-TUPLE(F2M_FT_SLOT_OFFER_REPLY,
-      (std::string, /* FT ID */
-       std::string, /* original sender */
-       FrameworkID,
-       OfferID,
-       std::vector<TaskDescription>,
-       Params));
-
 TUPLE(F2M_REVIVE_OFFERS,
       (FrameworkID));
 
@@ -178,7 +170,8 @@ TUPLE(F2F_SLOT_OFFER_REPLY,
 TUPLE(F2F_FRAMEWORK_MESSAGE,
       (FrameworkMessage));
 
-
+TUPLE(F2F_TASK_RUNNING_STATUS,
+      ());
 
 TUPLE(M2F_REGISTER_REPLY,
       (FrameworkID));

Modified: incubator/mesos/trunk/src/nexus_sched.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/nexus_sched.cpp?rev=1131762&r1=1131761&r2=1131762&view=diff
==============================================================================
--- incubator/mesos/trunk/src/nexus_sched.cpp (original)
+++ incubator/mesos/trunk/src/nexus_sched.cpp Sun Jun  5 05:26:48 2011
@@ -57,7 +57,42 @@ namespace nexus { namespace internal {
  * any synchronization necessary is performed.
  */
 
-    
+class ReliableReply : public Tuple<Process>
+{    
+public:
+  ReliableReply(const PID &_p, const TaskID &_tid) : 
+    parent(_p), tid(_tid), terminate(false) {}
+  
+protected:
+  void operator () ()
+  {
+    link(parent);
+    while(!terminate) {
+
+      switch(receive(FT_TIMEOUT)) {
+      case F2F_TASK_RUNNING_STATUS: {
+        terminate = true;
+        break;
+      }
+
+      case PROCESS_TIMEOUT: {
+        terminate = true;
+        DLOG(INFO) << "FT: faking M2F_STATUS_UPDATE due to ReplyToOffer timeout for tid:" << tid;
+        send(parent, 
+             pack<M2F_STATUS_UPDATE>(tid, TASK_LOST, ""));
+        break;
+      }
+
+      }
+    }
+    DLOG(INFO) << "FT: Exiting reliable reply for tid:" << tid;
+  }
+
+private:
+  bool terminate;
+  const PID parent;
+  const TaskID tid;
+};
 
 class SchedulerProcess : public Tuple<Process>
 {
@@ -81,25 +116,7 @@ private:
 
   volatile bool terminate;
 
-  class TimeoutListener;
-  friend class TimeoutListener;
-
-  class TimeoutListener : public FTCallback {
-  public:
-    TimeoutListener(SchedulerProcess *s, const vector<TaskDescription> t) : parent(s), tasks(t) {}
- 
-   virtual void timeout() {
-      foreach (const TaskDescription &t, tasks) {
-        DLOG(INFO) << "FT: faking M2F_STATUS_UPDATE due to timeout to server during ReplyToOffer";
-        parent->send(parent->self(), 
-                     pack<M2F_STATUS_UPDATE>(t.taskId, TASK_LOST, ""));
-      }
-    }
-
-  private:
-    SchedulerProcess *parent;
-    vector<TaskDescription> tasks;
-  };
+  unordered_map<TaskID, ReliableReply *> reliableReplies;
 
 public:
   SchedulerProcess(const string &_master,
@@ -246,20 +263,16 @@ protected:
 	// Remove the offer since we saved all the PIDs we might use.
         savedOffers.erase(oid);
 
-	// TODO(alig|benh): Walk through scenario if the master dies
-	// after it sends out M2S_RUN_TASK messages?
-
         if (isFT) {
-          TimeoutListener *tListener = new TimeoutListener(this, tasks);
+          foreach(const TaskDescription &task, tasks) {
+            ReliableReply *rr = new ReliableReply(self(), task.taskId);
+            reliableReplies[task.taskId] = rr;
+            link(spawn(rr));
+          }
+        }
+        
+        send(master, pack<F2M_SLOT_OFFER_REPLY>(fid, oid, tasks, params));
 
-          string ftId = ftMsg->getNextId();
-          DLOG(INFO) << "Sending reliably reply to slot offer for msg " << ftId;
-          ftMsg->reliableSend(ftId,
-			      pack<F2M_FT_SLOT_OFFER_REPLY>(ftId, self(), fid, oid, tasks, params),
-			      tListener);
-        } else {
-          send(master, pack<F2M_SLOT_OFFER_REPLY>(fid, oid, tasks, params));
-	}
 	
         break;
       }
@@ -294,6 +307,11 @@ protected:
           break;
         DLOG(INFO) << "FT: Received message with id: " << ftId;
 
+        if (state == TASK_RUNNING) {
+          send(reliableReplies[tid]->getPID(), pack<F2F_TASK_RUNNING_STATUS>());
+          reliableReplies.erase(tid);
+        }
+
         TaskStatus status(tid, state, data);
         invoke(bind(&Scheduler::statusUpdate, sched, driver, ref(status)));
         break;