You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 05:28:37 UTC

svn commit: r1131604 - in /incubator/mesos/trunk/src: ft_messaging.cpp ft_messaging.hpp master.cpp memhog.cpp messages.hpp nexus_sched.cpp

Author: benh
Date: Sun Jun  5 03:28:37 2011
New Revision: 1131604

URL: http://svn.apache.org/viewvc?rev=1131604&view=rev
Log:
ReplyToOffer now will timeout and fake TASK_LOST messages to itself in the Scheduler/Framework

Modified:
    incubator/mesos/trunk/src/ft_messaging.cpp
    incubator/mesos/trunk/src/ft_messaging.hpp
    incubator/mesos/trunk/src/master.cpp
    incubator/mesos/trunk/src/memhog.cpp
    incubator/mesos/trunk/src/messages.hpp
    incubator/mesos/trunk/src/nexus_sched.cpp

Modified: incubator/mesos/trunk/src/ft_messaging.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/ft_messaging.cpp?rev=1131604&r1=1131603&r2=1131604&view=diff
==============================================================================
--- incubator/mesos/trunk/src/ft_messaging.cpp (original)
+++ incubator/mesos/trunk/src/ft_messaging.cpp Sun Jun  5 03:28:37 2011
@@ -76,7 +76,12 @@ void FTMessaging::sendOutstanding() {
   } 
 
   foreachpair( const string &ftId, struct FTStoredMsg &msg, outMsgs) {
-    if (msg.count < FT_MAX_RESENDS) {
+    if (msg.callback != NULL) {
+      DLOG(INFO) << "FT: calling timeout listener";
+      msg.callback->timeout();
+      delete msg.callback; // ugly and sad. shared_ptr would have been better
+      outMsgs.erase(ftId);
+    } else if (msg.count < FT_MAX_RESENDS) {
       DLOG(INFO) << "FT: RE-sending " << msg.ftId << " attempt:" << msg.count;
       Process::post(master, msg.id, msg.data.data(), msg.data.size());
       msg.count++;

Modified: incubator/mesos/trunk/src/ft_messaging.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/ft_messaging.hpp?rev=1131604&r1=1131603&r2=1131604&view=diff
==============================================================================
--- incubator/mesos/trunk/src/ft_messaging.hpp (original)
+++ incubator/mesos/trunk/src/ft_messaging.hpp Sun Jun  5 03:28:37 2011
@@ -63,26 +63,37 @@ using boost::unordered_set;
 class EmptyClass {
 };
 
+
+/**
+ * Interface used to signal that a message has timed out. 
+ */
+class FTCallback {
+public:
+  /**
+   * Called from sendOutstanding() if a message has a callback object and it has timed out.
+   */
+  virtual void timeout() = 0;
+};
+
 /**
  * Used in FTMessaging to store unacked messages, their count, ftId, libprocess id.
  * @see FTMessaging
  */
 struct FTStoredMsg {
 
-  FTStoredMsg(const string &_ftId, const string &_data, const MSGID &_id) : 
-    ftId(_ftId), data(_data), id(_id), count(1), ts(time(0)) {}
+  FTStoredMsg(const string &_ftId, const string &_data, const MSGID &_id, FTCallback *_cb=NULL) : 
+    ftId(_ftId), data(_data), id(_id), count(1), ts(time(0)), callback(_cb) {}
 
-  FTStoredMsg() : ftId(""), data(""), id(), count(1), ts(time(0)) {}
+  FTStoredMsg(bool _cb=false) : ftId(""), data(""), id(), count(1), ts(time(0)), callback(NULL) {}
 
   string ftId;
   string data;
   MSGID id;
   long count;
   time_t ts;   // not currently used
+  FTCallback *callback;
 };
 
-
-
 /**
  * Singleton class that provides functionality for reliably sending messages, 
  * resending them on timeout, acking received messages, and dropping duplicates.
@@ -113,13 +124,14 @@ public:
    * @see getNextId().
    * @param ftId string representing the unique FT id of the message
    * @param msgTuple libprocess tuple<ID> 
+   * @param FTCallback if not null, then FTCallback will be called by sendOutstanding()
    */
-  template<MSGID ID> void reliableSend(const string &ftId, const tuple<ID> &msgTuple)
+  template<MSGID ID> void reliableSend(const string &ftId, const tuple<ID> &msgTuple, FTCallback *callback=NULL)
   {
     DLOG(INFO) << "FT: sending " << ftId;
     string msgStr = Tuple<EmptyClass>::tupleToString(msgTuple);
-    FTStoredMsg sm(ftId, msgStr, ID);
-    outMsgs[ftId] = sm;
+
+    outMsgs[ftId] = FTStoredMsg(ftId, msgStr, ID, callback);
     if (!master) {
       DLOG(INFO) << "FT: Not RE-resending due to NULL master PID";
       return;

Modified: incubator/mesos/trunk/src/master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master.cpp?rev=1131604&r1=1131603&r2=1131604&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master.cpp (original)
+++ incubator/mesos/trunk/src/master.cpp Sun Jun  5 03:28:37 2011
@@ -353,6 +353,35 @@ void Master::operator () ()
       break;
     }
 
+    case F2M_FT_SLOT_OFFER_REPLY: {
+      FrameworkID fid;
+      OfferID oid;
+      vector<TaskDescription> tasks;
+      Params params;
+      string ftId, senderStr;
+      unpack<F2M_FT_SLOT_OFFER_REPLY>(ftId, senderStr, fid, oid, tasks, params);
+      if (!ftMsg->acceptMessageAck(senderStr, ftId)) {
+        LOG(WARNING) << "FT: Locally ignoring duplicate message with id:" << ftId;
+        break;
+      } 
+      Framework *framework = lookupFramework(fid);
+      if (framework != NULL) {
+	SlotOffer *offer = lookupSlotOffer(oid);
+	if (offer != NULL) {
+	  processOfferReply(offer, tasks, params);
+	} else {
+	  // The slot offer is gone, meaning that we rescinded it or that
+	  // the slave was lost; immediately report any tasks in it as lost
+	  foreach (TaskDescription &t, tasks) {
+	    send(framework->pid,
+		 pack<M2F_STATUS_UPDATE>(t.taskId, TASK_LOST, ""));
+	  }
+	}
+      } else
+        DLOG(INFO) << "F2M_FT_SLOT_OFFER_REPLY error: couldn't lookup framework id" << fid;
+      break;
+    }
+
     case F2M_SLOT_OFFER_REPLY: {
       FrameworkID fid;
       OfferID oid;

Modified: incubator/mesos/trunk/src/memhog.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/memhog.cpp?rev=1131604&r1=1131603&r2=1131604&view=diff
==============================================================================
--- incubator/mesos/trunk/src/memhog.cpp (original)
+++ incubator/mesos/trunk/src/memhog.cpp Sun Jun  5 03:28:37 2011
@@ -77,6 +77,10 @@ public:
 
   virtual void statusUpdate(SchedulerDriver* d, const TaskStatus& status) {
     cout << endl << "Task " << status.taskId << " is in state " << status.state << endl;
+    if (status.state == TASK_LOST) 
+       {
+	  cout << endl << "Task " << status.taskId << " lost. Not doing anything about it." << endl;
+       }
     if (status.state == TASK_FINISHED)
       tasksFinished++;
     if (tasksFinished == totalTasks)

Modified: incubator/mesos/trunk/src/messages.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messages.hpp?rev=1131604&r1=1131603&r2=1131604&view=diff
==============================================================================
--- incubator/mesos/trunk/src/messages.hpp (original)
+++ incubator/mesos/trunk/src/messages.hpp Sun Jun  5 03:28:37 2011
@@ -23,6 +23,7 @@ enum MessageType {
   F2M_REREGISTER_FRAMEWORK,
   F2M_UNREGISTER_FRAMEWORK,
   F2M_SLOT_OFFER_REPLY,
+  F2M_FT_SLOT_OFFER_REPLY,
   F2M_REVIVE_OFFERS,
   F2M_KILL_TASK,
   F2M_FRAMEWORK_MESSAGE,
@@ -136,6 +137,14 @@ TUPLE(F2M_SLOT_OFFER_REPLY,
        std::vector<TaskDescription>,
        Params));
 
+TUPLE(F2M_FT_SLOT_OFFER_REPLY,
+      (std::string, /* FT ID */
+       std::string, /* original sender */
+       FrameworkID,
+       OfferID,
+       std::vector<TaskDescription>,
+       Params));
+
 TUPLE(F2M_REVIVE_OFFERS,
       (FrameworkID));
 

Modified: incubator/mesos/trunk/src/nexus_sched.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/nexus_sched.cpp?rev=1131604&r1=1131603&r2=1131604&view=diff
==============================================================================
--- incubator/mesos/trunk/src/nexus_sched.cpp (original)
+++ incubator/mesos/trunk/src/nexus_sched.cpp Sun Jun  5 03:28:37 2011
@@ -1,3 +1,6 @@
+
+#define FT_TIMEOUT 10
+
 #include <dlfcn.h>
 #include <errno.h>
 #include <pwd.h>
@@ -56,6 +59,8 @@ namespace nexus { namespace internal {
  * any synchronization necessary is performed.
  */
 
+    
+
 class SchedulerProcess : public Tuple<Process>
 {
 public:
@@ -100,6 +105,26 @@ private:
     PID parentPID;
   } schedLeaderListener;
 
+  
+  class TimeoutListener;
+  friend class TimeoutListener;
+
+  class TimeoutListener : public FTCallback {
+  public:
+    TimeoutListener(SchedulerProcess *s, vector<TaskDescription> t) : parent(s), tasks(t) {}
+ 
+   virtual void timeout() {
+      foreach (TaskDescription &t, tasks) {
+        DLOG(INFO) << "FT: faking M2F_STATUS_UPDATE due to timeout to server during ReplyToOffer";
+        parent->send( parent->self(), 
+                      pack<M2F_STATUS_UPDATE>(t.taskId, TASK_LOST, ""));
+      }
+    }
+
+  private:
+    SchedulerProcess *parent;
+    vector<TaskDescription> tasks;
+  };
 
 public:
   SchedulerProcess(const PID &_master,
@@ -175,7 +200,7 @@ protected:
       if (terminate)
         return;
 
-      switch(receive()) {
+      switch(receive(FT_TIMEOUT)) {
       case M2F_REGISTER_REPLY: {
         unpack<M2F_REGISTER_REPLY>(fid);
         invoke(bind(&Scheduler::registered, sched, driver, fid));
@@ -214,6 +239,7 @@ protected:
 
 
       case M2F_STATUS_UPDATE: {
+        DLOG(INFO) << "Got M2F_STATUS_UPDATE";
         TaskID tid;
         TaskState state;
         string data;
@@ -283,6 +309,10 @@ protected:
 	send(master, pack<F2M_REREGISTER_FRAMEWORK>(fid, frameworkName, user, execInfo));
 	break;
       }
+      case PROCESS_TIMEOUT: {
+        ftMsg->sendOutstanding();
+	break;
+      }
       default: {
         ostringstream oss;
         oss << "SchedulerProcess received unknown message " << msgid()
@@ -480,6 +510,20 @@ void NexusSchedulerDriver::replyToOffer(
 
   // TODO(benh): Do a Process::post instead?
 
+  if (process->isFT) {
+    SchedulerProcess::TimeoutListener *tListener = 
+      new SchedulerProcess::TimeoutListener(process, tasks);
+
+    string ftId = process->ftMsg->getNextId();
+    process->ftMsg->reliableSend( ftId,
+                                  process->pack<F2M_FT_SLOT_OFFER_REPLY>(ftId,
+                                                                         process->self(),
+                                                                         process->fid,
+                                                                         offerId,
+                                                                         tasks,
+                                                                         Params(params)),
+                                  tListener);
+  } else
   process->send(process->master,
                 process->pack<F2M_SLOT_OFFER_REPLY>(process->fid,
                                                     offerId,