You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 07:43:16 UTC

svn commit: r1131873 - in /incubator/mesos/trunk/src: slave.cpp slave.hpp third_party/libprocess/reliable.cpp third_party/libprocess/reliable.hpp

Author: benh
Date: Sun Jun  5 05:43:16 2011
New Revision: 1131873

URL: http://svn.apache.org/viewvc?rev=1131873&view=rev
Log:
Added the ability to cancel a reliable message (so that it is not continually retried). Closes #65.

Modified:
    incubator/mesos/trunk/src/slave.cpp
    incubator/mesos/trunk/src/slave.hpp
    incubator/mesos/trunk/src/third_party/libprocess/reliable.cpp
    incubator/mesos/trunk/src/third_party/libprocess/reliable.hpp

Modified: incubator/mesos/trunk/src/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave.cpp?rev=1131873&r1=1131872&r2=1131873&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave.cpp Sun Jun  5 05:43:16 2011
@@ -306,7 +306,10 @@ void Slave::operator () ()
 	const string &msg =
 	  tupleToString(pack<S2M_FT_STATUS_UPDATE>(id, fid, tid, taskState,
 						   data));
-	rsend(master, S2M_FT_STATUS_UPDATE, msg.data(), msg.size());
+
+        // Reliably send message and save sequence number for canceling later.
+	int seq = rsend(master, S2M_FT_STATUS_UPDATE, msg.data(), msg.size());
+        seqs[fid].insert(seq);
         break;
       }
 
@@ -434,7 +437,16 @@ void Slave::removeExecutor(FrameworkID f
 void Slave::killFramework(Framework *fw)
 {
   LOG(INFO) << "Cleaning up framework " << fw->id;
+
+  // Cancel sending any reliable messages for this framework.
+  foreach (int seq, seqs[fw->id])
+    cancel(seq);
+
+  seqs.erase(fw->id);
+
+  // Remove its allocated resources.
   fw->resources = Resources();
+
   // If an executor is running, tell it to exit and kill it
   if (Executor *ex = getExecutor(fw->id)) {
     send(ex->pid, pack<S2E_KILL_EXECUTOR>());

Modified: incubator/mesos/trunk/src/slave.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave.hpp?rev=1131873&r1=1131872&r2=1131873&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave.hpp (original)
+++ incubator/mesos/trunk/src/slave.hpp Sun Jun  5 05:43:16 2011
@@ -169,6 +169,9 @@ public:
   ExecutorMap executors;  // Invariant: framework will exist if executor exists
   IsolationModule *isolationModule;
 
+  // Sequence numbers of reliable messages sent on behalf of framework.
+  unordered_map<FrameworkID, unordered_set<int> > seqs;
+
 public:
   Slave(Resources resources, bool local, IsolationModule *isolationModule);
 

Modified: incubator/mesos/trunk/src/third_party/libprocess/reliable.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/third_party/libprocess/reliable.cpp?rev=1131873&r1=1131872&r2=1131873&view=diff
==============================================================================
--- incubator/mesos/trunk/src/third_party/libprocess/reliable.cpp (original)
+++ incubator/mesos/trunk/src/third_party/libprocess/reliable.cpp Sun Jun  5 05:43:16 2011
@@ -73,7 +73,8 @@ protected:
 };
 
 
-ReliableProcess::ReliableProcess() : current(NULL) {}
+ReliableProcess::ReliableProcess()
+  : current(NULL), nextSeq(0) {}
 
 
 ReliableProcess::~ReliableProcess()
@@ -151,12 +152,14 @@ bool ReliableProcess::forward(const PID 
 }
 
 
-void ReliableProcess::rsend(const PID &to, MSGID id, const char *data, size_t length)
+int ReliableProcess::rsend(const PID &to, MSGID id, const char *data, size_t length)
 {
   // Allocate/Initialize outgoing message.
   struct rmsg *rmsg = (struct rmsg *) malloc(sizeof(struct rmsg) + length);
 
-  rmsg->seq = sentSeqs[to]++;
+  int seq = nextSeq++;
+
+  rmsg->seq = seq;
 
   rmsg->msg.from.pipe = self().pipe;
   rmsg->msg.from.ip = self().ip;
@@ -173,6 +176,8 @@ void ReliableProcess::rsend(const PID &t
   ReliableSender *sender = new ReliableSender(rmsg);
   PID pid = link(spawn(sender));
   senders[pid] = sender;
+
+  return seq;
 }
 
 
@@ -262,3 +267,18 @@ void ReliableProcess::redirect(const PID
       send(pid, RELIABLE_REDIRECT, (char *) &updated, sizeof(PID));
   }
 }
+
+
+void ReliableProcess::cancel(int seq)
+{
+  foreachpair (const PID &pid, ReliableSender *sender, senders) {
+    assert(pid == sender->getPID());
+    // Shut it down by sending it an ack. It will get cleaned up via
+    // the PROCESS_EXIT above.
+    // TODO(benh): Don't look into sender's class like this ... HACK!
+    if (seq == sender->rmsg->seq) {
+      send(pid, RELIABLE_ACK);
+      break;
+    }
+  }
+}

Modified: incubator/mesos/trunk/src/third_party/libprocess/reliable.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/third_party/libprocess/reliable.hpp?rev=1131873&r1=1131872&r2=1131873&view=diff
==============================================================================
--- incubator/mesos/trunk/src/third_party/libprocess/reliable.hpp (original)
+++ incubator/mesos/trunk/src/third_party/libprocess/reliable.hpp Sun Jun  5 05:43:16 2011
@@ -63,8 +63,9 @@ protected:
    * Sends a _reliable_ message to PID.
    * @param to destination
    * @param id message id
+   * @return sequence number of message
    */
-  virtual void rsend(const PID &to, MSGID id);
+  virtual int rsend(const PID &to, MSGID id);
 
   /**
    * Sends a _reliable_ message with data to PID.
@@ -72,8 +73,9 @@ protected:
    * @param id message id
    * @param data payload
    * @param length payload length
+   * @return sequence number of message
    */
-  virtual void rsend(const PID &to, MSGID id, const char *data, size_t length);
+  virtual int rsend(const PID &to, MSGID id, const char *data, size_t length);
 
   /* Blocks for message indefinitely. */
   virtual MSGID receive();
@@ -87,18 +89,25 @@ protected:
    * @param updated the new PID
    */
   virtual void redirect(const PID &existing, const PID &updated);
+
+  /**
+   * Cancel trying to reliably send the message with the specified
+   * sequence number.
+   * @param seq sequence number of message to cancel
+   */
+  virtual void cancel(int seq);
   
 private:
   struct rmsg *current;
-  std::map<PID, int> sentSeqs;
+  int nextSeq;
   std::map<PID, int> recvSeqs;
   std::map<PID, ReliableSender *> senders;
 };
 
 
-inline void ReliableProcess::rsend(const PID &to, MSGID id)
+inline int ReliableProcess::rsend(const PID &to, MSGID id)
 {
-  rsend(to, id, NULL, 0);
+  return rsend(to, id, NULL, 0);
 }