You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 10:41:26 UTC

svn commit: r1132101 - in /incubator/mesos/trunk: src/messaging/messages.hpp src/sched/sched.cpp src/slave/slave.cpp third_party/libprocess/reliable.cpp third_party/libprocess/reliable.hpp

Author: benh
Date: Sun Jun  5 08:41:25 2011
New Revision: 1132101

URL: http://svn.apache.org/viewvc?rev=1132101&view=rev
Log:
Fixed serious bug with reliable messaging that effectively made a system for sharing the cluster a system for running only one thing on the cluster at a time. :| Oops. For posterity, see commit edd68032cf56177c3a5c157858782ea31921689f for a reference to when I introduced this bug unknowningly.
(cherry picked from commit f4080a9bac48f0b416b73fd0a46f6785f2bc8cb1)

Modified:
    incubator/mesos/trunk/src/messaging/messages.hpp
    incubator/mesos/trunk/src/sched/sched.cpp
    incubator/mesos/trunk/src/slave/slave.cpp
    incubator/mesos/trunk/third_party/libprocess/reliable.cpp
    incubator/mesos/trunk/third_party/libprocess/reliable.hpp

Modified: incubator/mesos/trunk/src/messaging/messages.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messaging/messages.hpp?rev=1132101&r1=1132100&r2=1132101&view=diff
==============================================================================
--- incubator/mesos/trunk/src/messaging/messages.hpp (original)
+++ incubator/mesos/trunk/src/messaging/messages.hpp Sun Jun  5 08:41:25 2011
@@ -164,6 +164,13 @@ protected:
     return ReliableProcess::rsend(to, ID, data.data(), data.size());
   }
 
+  template <MSGID ID>
+  int rsend(const PID &via, const PID &to, const tuple<ID> &t)
+  {
+    const std::string &data = MESOS_MESSAGING_VERSION + "|" + std::string(t);
+    return ReliableProcess::rsend(via, to, ID, data.data(), data.size());
+  }
+
   virtual MSGID receive() { return receive(0); }
 
   virtual MSGID receive(double secs)

Modified: incubator/mesos/trunk/src/sched/sched.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/sched/sched.cpp?rev=1132101&r1=1132100&r2=1132101&view=diff
==============================================================================
--- incubator/mesos/trunk/src/sched/sched.cpp (original)
+++ incubator/mesos/trunk/src/sched/sched.cpp Sun Jun  5 08:41:25 2011
@@ -77,14 +77,14 @@ protected:
 
       case PROCESS_TIMEOUT: {
         terminate = true;
-        VLOG(1) << "FT: faking M2F_STATUS_UPDATE due to ReplyToOffer timeout for tid:" << tid;
+        VLOG(1) << "No status updates received for tid:" << tid
+		<< ". Assuming task was lost.";
         send(parent, pack<M2F_STATUS_UPDATE>(tid, TASK_LOST, ""));
         break;
       }
 
       }
     }
-    VLOG(1) << "FT: Exiting reliable reply for tid:" << tid;
   }
 
 private:
@@ -178,7 +178,7 @@ protected:
 	PID masterPid;
 	tie(masterSeq, masterPid) = unpack<NEW_MASTER_DETECTED>(body());
 
-	LOG(INFO) << "New master at " << masterPid << " with ID:" << masterSeq;
+	VLOG(1) << "New master at " << masterPid << " with ID:" << masterSeq;
 
         redirect(master, masterPid);
 	master = masterPid;
@@ -283,10 +283,12 @@ protected:
 
 	tie(sid, fid, tid, state, data) = unpack<S2M_FT_STATUS_UPDATE>(body());
 
-        if (duplicate())
+        if (duplicate()) {
+          VLOG(1) << "Received a duplicate status update for tid " << tid
+		     << ", status = " << state;
           break;
+        }
         ack();
-        VLOG(1) << "FT: Received message with id: " << seq();
 
 	unordered_map <TaskID, RbReply *>::iterator it = rbReplies.find(tid);
 	if (it != rbReplies.end()) {
@@ -347,7 +349,7 @@ protected:
 
       case PROCESS_EXIT: {
 	// TODO(benh): Don't wait for a new master forever.
-	LOG(WARNING) << "Connection to master lost .. waiting for new master.";
+	VLOG(1) << "Connection to master lost .. waiting for new master.";
         break;
       }
 

Modified: incubator/mesos/trunk/src/slave/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.cpp?rev=1132101&r1=1132100&r2=1132101&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave/slave.cpp Sun Jun  5 08:41:25 2011
@@ -343,20 +343,28 @@ void Slave::operator () ()
         TaskState taskState;
         string data;
         tie(fid, tid, taskState, data) = unpack<E2S_STATUS_UPDATE>(body());
-        LOG(INFO) << "Got status update for task " << fid << ":" << tid;
-        if (taskState == TASK_FINISHED || taskState == TASK_FAILED ||
-            taskState == TASK_KILLED || taskState == TASK_LOST) {
-          LOG(INFO) << "Task " << fid << ":" << tid << " done";
-          if (Framework *fw = getFramework(fid)) {
-            fw->removeTask(tid);
-            isolationModule->resourcesChanged(fw);
+
+        Framework *framework = getFramework(fid);
+        if (framework != NULL) {
+	  LOG(INFO) << "Got status update for task " << fid << ":" << tid;
+	  if (taskState == TASK_FINISHED || taskState == TASK_FAILED ||
+	      taskState == TASK_KILLED || taskState == TASK_LOST) {
+	    LOG(INFO) << "Task " << fid << ":" << tid << " done";
+
+            framework->removeTask(tid);
+            isolationModule->resourcesChanged(framework);
           }
-        }
 
-        // Reliably send message and save sequence number for canceling later.
-        int seq = rsend(master, pack<S2M_FT_STATUS_UPDATE>(id, fid, tid,
-                                                           taskState, data));
-        seqs[fid].insert(seq);
+	  // Reliably send message and save sequence number for
+	  // canceling later.
+	  int seq = rsend(master, framework->pid,
+			  pack<S2M_FT_STATUS_UPDATE>(id, fid, tid,
+						     taskState, data));
+	  seqs[fid].insert(seq);
+	} else {
+	  LOG(WARNING) << "Got status update for UNKNOWN task "
+		       << fid << ":" << tid;
+	}
         break;
       }
 

Modified: incubator/mesos/trunk/third_party/libprocess/reliable.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/reliable.cpp?rev=1132101&r1=1132100&r2=1132101&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/reliable.cpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/reliable.cpp Sun Jun  5 08:41:25 2011
@@ -4,7 +4,9 @@
 #include "foreach.hpp"
 #include "reliable.hpp"
 
+using std::make_pair;
 using std::map;
+using std::pair;
 
 #define malloc(bytes)                                               \
   ({ void *tmp;                                                     \
@@ -34,10 +36,11 @@ struct rmsg
 class ReliableSender : public Process
 {
 public:
+  PID via;
   struct rmsg *rmsg;
 
-  ReliableSender(struct rmsg *_rmsg)
-    : rmsg(_rmsg) {}
+  ReliableSender(const PID &_via, struct rmsg *_rmsg)
+    : via(_via), rmsg(_rmsg) {}
 
   ~ReliableSender()
   {
@@ -51,7 +54,7 @@ protected:
   void operator () ()
   {
     do {
-      send(rmsg->msg.to, RELIABLE_MSG, (char *) rmsg,
+      send(via, RELIABLE_MSG, (char *) rmsg,
 	   sizeof(struct rmsg) + rmsg->msg.len);
 
       switch (receive(RELIABLE_TIMEOUT)) {
@@ -59,7 +62,11 @@ protected:
 	  // All done!
 	  return;
 	}
-        case RELIABLE_REDIRECT: {
+        case RELIABLE_REDIRECT_VIA: {
+	  via = *reinterpret_cast<const PID *>(body(NULL));
+	  break;
+	}
+        case RELIABLE_REDIRECT_TO: {
 	  rmsg->msg.to = *reinterpret_cast<const PID *>(body(NULL));
 	  break;
 	}
@@ -74,7 +81,7 @@ protected:
 
 
 ReliableProcess::ReliableProcess()
-  : current(NULL), nextSeq(0) {}
+  : current(NULL) {}
 
 
 ReliableProcess::~ReliableProcess()
@@ -110,9 +117,11 @@ bool ReliableProcess::duplicate() const
   // greater than the last one we saw. Note that we don't add the
   // sequence identifier for the current message until the next
   // 'receive' invocation (see below).
-  if (current != NULL)
-    if (recvSeqs.count(current->msg.from) > 0)
-      return current->seq <= recvSeqs.find(current->msg.from)->second;
+  if (current != NULL) {
+    pair<PID, PID> from_to = make_pair(current->msg.from, current->msg.to);
+    if (recvSeqs.count(from_to) > 0)
+      return current->seq <= recvSeqs.find(from_to)->second;
+  }
 
   return false;
 }
@@ -123,7 +132,16 @@ PID ReliableProcess::origin() const
   if (current != NULL)
     return current->msg.from;
 
-  return PID();
+  return from();
+}
+
+
+PID ReliableProcess::destination() const
+{
+  if (current != NULL)
+    return current->msg.to;
+
+  return self();
 }
 
 
@@ -135,10 +153,10 @@ void ReliableProcess::ack()
 }
 
 
-bool ReliableProcess::forward(const PID &to)
+bool ReliableProcess::forward(const PID &via)
 {
   if (current != NULL) {
-    send(to, RELIABLE_MSG, (char *) current,
+    send(via, RELIABLE_MSG, (char *) current,
 	 sizeof(struct rmsg) + current->msg.len);
     return true;
   }
@@ -152,7 +170,36 @@ int ReliableProcess::rsend(const PID &to
   // Allocate/Initialize outgoing message.
   struct rmsg *rmsg = (struct rmsg *) malloc(sizeof(struct rmsg) + length);
 
-  int seq = nextSeq++;
+  int seq = sentSeqs[to]++;
+
+  rmsg->seq = seq;
+
+  rmsg->msg.from.pipe = self().pipe;
+  rmsg->msg.from.ip = self().ip;
+  rmsg->msg.from.port = self().port;
+  rmsg->msg.to.pipe = to.pipe;
+  rmsg->msg.to.ip = to.ip;
+  rmsg->msg.to.port = to.port;
+  rmsg->msg.id = id;
+  rmsg->msg.len = length;
+
+  if (length > 0)
+    memcpy((char *) rmsg + sizeof(struct rmsg), data, length);
+
+  ReliableSender *sender = new ReliableSender(to, rmsg);
+  PID pid = link(spawn(sender));
+  senders[pid] = sender;
+
+  return seq;
+}
+
+
+int ReliableProcess::rsend(const PID &via, const PID &to, MSGID id, const char *data, size_t length)
+{
+  // Allocate/Initialize outgoing message.
+  struct rmsg *rmsg = (struct rmsg *) malloc(sizeof(struct rmsg) + length);
+
+  int seq = sentSeqs[to]++;
 
   rmsg->seq = seq;
 
@@ -168,7 +215,7 @@ int ReliableProcess::rsend(const PID &to
   if (length > 0)
     memcpy((char *) rmsg + sizeof(struct rmsg), data, length);
 
-  ReliableSender *sender = new ReliableSender(rmsg);
+  ReliableSender *sender = new ReliableSender(via, rmsg);
   PID pid = link(spawn(sender));
   senders[pid] = sender;
 
@@ -185,9 +232,10 @@ MSGID ReliableProcess::receive(double se
     // can be sure that the current message is the next in the
     // sequence (unless it's the first message or a duplicate).
     if (!duplicate()) {
-      assert((recvSeqs.count(current->msg.from) == 0) ||
-	     (recvSeqs[current->msg.from] + 1 == current->seq));
-      recvSeqs[current->msg.from] = current->seq;
+      pair<PID, PID> from_to = make_pair(current->msg.from, current->msg.to);
+      assert((recvSeqs.count(from_to) == 0) ||
+	     (recvSeqs[from_to] + 1 == current->seq));
+      recvSeqs[from_to] = current->seq;
     }
     free(current);
     current = NULL;
@@ -222,8 +270,9 @@ MSGID ReliableProcess::receive(double se
 	memcpy((char *) current, data, length);
 
 	// TODO(benh): Don't ignore out-of-order messages!
-	if (recvSeqs.count(current->msg.from) > 0 &&
-            recvSeqs[current->msg.from] + 1 < current->seq) {
+	pair<PID, PID> from_to = make_pair(current->msg.from, current->msg.to);
+	if (recvSeqs.count(from_to) > 0 &&
+            recvSeqs[from_to] + 1 < current->seq) {
           free(current);
           current = NULL;
           continue;
@@ -261,8 +310,10 @@ void ReliableProcess::redirect(const PID
   foreachpair (const PID &pid, ReliableSender *sender, senders) {
     assert(pid == sender->self());
     // TODO(benh): Don't look into sender's class like this ... HACK!
+    if (existing == sender->via)
+      send(pid, RELIABLE_REDIRECT_VIA, (char *) &updated, sizeof(PID));
     if (existing == sender->rmsg->msg.to)
-      send(pid, RELIABLE_REDIRECT, (char *) &updated, sizeof(PID));
+      send(pid, RELIABLE_REDIRECT_TO, (char *) &updated, sizeof(PID));
   }
 }
 

Modified: incubator/mesos/trunk/third_party/libprocess/reliable.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/reliable.hpp?rev=1132101&r1=1132100&r2=1132101&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/reliable.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/reliable.hpp Sun Jun  5 08:41:25 2011
@@ -3,6 +3,7 @@
 
 #include <process.hpp>
 
+#include <functional>
 #include <map>
 
 #define RELIABLE_TIMEOUT 10
@@ -11,7 +12,8 @@
 enum {
   RELIABLE_MSG = PROCESS_MSGID,
   RELIABLE_ACK,
-  RELIABLE_REDIRECT,
+  RELIABLE_REDIRECT_VIA,
+  RELIABLE_REDIRECT_TO,
   RELIABLE_MSGID
 };
 
@@ -41,11 +43,18 @@ protected:
   virtual bool duplicate() const;
 
   /**
-   * @return origin of current message, or equal to @see Process::from().
+   * @return origin of current message (if current message is not
+   * reliable this returns Process::from()).
    */
   virtual PID origin() const;
 
   /**
+   * @return destination of current message (if current message is not
+   * reliable this returns Process::self()).
+   */
+  virtual PID destination() const;
+
+  /**
    * Acknowledges the current message by sending an 'ack' back to the
    * origin, or does nothing if the current message is not _reliable_.
    */
@@ -53,11 +62,11 @@ protected:
 
   /**
    * Forward current message (provided it is _reliable_).
-   * @param to destination
+   * @param via hop (or possibly destination)
    * @return false if the current message is not _reliable_, true
    * otherwise.
    */
-  virtual bool forward(const PID &to);
+  virtual bool forward(const PID &via);
 
   /**
    * Sends a _reliable_ message to PID.
@@ -68,6 +77,16 @@ protected:
   virtual int rsend(const PID &to, MSGID id);
 
   /**
+   * Sends a _reliable_ message via another PID (meant to be
+   * forwarded).
+   * @param via hop
+   * @param to destination
+   * @param id message id
+   * @return sequence number of message
+   */
+  virtual int rsend(const PID &via, const PID &to, MSGID id);
+
+  /**
    * Sends a _reliable_ message with data to PID.
    * @param to destination
    * @param id message id
@@ -77,6 +96,19 @@ protected:
    */
   virtual int rsend(const PID &to, MSGID id, const char *data, size_t length);
 
+  /**
+   * Sends a _reliable_ message with data via another process (meant
+   * to be forwarded).
+   * @param via hop
+   * @param to destination
+   * @param id message id
+   * @param data payload
+   * @param length payload length
+   * @return sequence number of message
+   */
+  virtual int rsend(const PID &via, const PID &to, MSGID id, const char *data, size_t length);
+
+
   /* Blocks for message indefinitely. */
   virtual MSGID receive();
 
@@ -99,8 +131,8 @@ protected:
   
 private:
   struct rmsg *current;
-  int nextSeq;
-  std::map<PID, int> recvSeqs;
+  std::map<PID, int> sentSeqs;
+  std::map<std::pair<PID, PID>, int> recvSeqs;
   std::map<PID, ReliableSender *> senders;
 };
 
@@ -111,6 +143,12 @@ inline int ReliableProcess::rsend(const 
 }
 
 
+inline int ReliableProcess::rsend(const PID &via, const PID &to, MSGID id)
+{
+  return rsend(via, to, id, NULL, 0);
+}
+
+
 inline MSGID ReliableProcess::receive()
 {
   return receive(0);