You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 10:57:41 UTC

svn commit: r1132198 - in /incubator/mesos/trunk: src/master/master.cpp src/messaging/messages.hpp src/sched/sched.cpp src/slave/slave.cpp src/tests/master_test.cpp third_party/libprocess/reliable.cpp third_party/libprocess/reliable.hpp

Author: benh
Date: Sun Jun  5 08:57:40 2011
New Revision: 1132198

URL: http://svn.apache.org/viewvc?rev=1132198&view=rev
Log:
Added improved support for forwarding reliable messages that allows changing the body of the forwarded message and used the new support to eliminate redundant *_FT_STATUS_UPDATE messages.

Modified:
    incubator/mesos/trunk/src/master/master.cpp
    incubator/mesos/trunk/src/messaging/messages.hpp
    incubator/mesos/trunk/src/sched/sched.cpp
    incubator/mesos/trunk/src/slave/slave.cpp
    incubator/mesos/trunk/src/tests/master_test.cpp
    incubator/mesos/trunk/third_party/libprocess/reliable.cpp
    incubator/mesos/trunk/third_party/libprocess/reliable.hpp

Modified: incubator/mesos/trunk/src/master/master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/master.cpp?rev=1132198&r1=1132197&r2=1132198&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/master.cpp (original)
+++ incubator/mesos/trunk/src/master/master.cpp Sun Jun  5 08:57:40 2011
@@ -541,25 +541,20 @@ void Master::operator () ()
       break;
     }
 
-    case S2M_FT_STATUS_UPDATE: {
+    case S2M_STATUS_UPDATE: {
       SlaveID sid;
       FrameworkID fid;
       TaskID tid;
       TaskState state;
       string data;
-      tie(sid, fid, tid, state, data) = unpack<S2M_FT_STATUS_UPDATE>(body());
+      tie(sid, fid, tid, state, data) = unpack<S2M_STATUS_UPDATE>(body());
 
-      VLOG(1) << "FT: prepare relay seq:"<< seq() << " from: "<< from();
       if (Slave *slave = lookupSlave(sid)) {
         if (Framework *framework = lookupFramework(fid)) {
-	  // Pass on the status update to the framework.
-	  // TODO(benh): Do we not want to forward the
-	  // S2M_FT_STATUS_UPDATE message? This seems a little tricky
-	  // because we really wanted to send the M2F_FT_STATUS_UPDATE
-	  // message.
-          forward(framework->pid);
+	  // Pass on the (transformed) status update to the framework.
+          forward(framework->pid, pack<M2F_STATUS_UPDATE>(tid, state, data));
           if (duplicate()) {
-            LOG(WARNING) << "FT: Locally ignoring duplicate message with id:" << seq();
+            LOG(WARNING) << "Locally ignoring duplicate message with id:" << seq();
             break;
           }
           // Update the task state locally.
@@ -575,48 +570,14 @@ void Master::operator () ()
             }
           }
         } else {
-          LOG(ERROR) << "S2M_FT_STATUS_UPDATE error: couldn't lookup "
+          LOG(ERROR) << "S2M_STATUS_UPDATE error: couldn't lookup "
                      << "framework id " << fid;
         }
       } else {
-        LOG(ERROR) << "S2M_FT_STATUS_UPDATE error: couldn't lookup slave id "
-                   << sid;
-      }
-      break;
-    }
-
-    case S2M_STATUS_UPDATE: {
-      SlaveID sid;
-      FrameworkID fid;
-      TaskID tid;
-      TaskState state;
-      string data;
-      tie(sid, fid, tid, state, data) = unpack<S2M_STATUS_UPDATE>(body());
-      if (Slave *slave = lookupSlave(sid)) {
-        if (Framework *framework = lookupFramework(fid)) {
-          // Pass on the status update to the framework
-          send(framework->pid, pack<M2F_STATUS_UPDATE>(tid, state, data));
-          // Update the task state locally
-          Task *task = slave->lookupTask(fid, tid);
-          if (task != NULL) {
-            LOG(INFO) << "Status update: " << task << " is in state " << state;
-            task->state = state;
-            // Remove the task if it finished or failed
-            if (state == TASK_FINISHED || state == TASK_FAILED ||
-                state == TASK_KILLED || state == TASK_LOST) {
-              LOG(INFO) << "Removing " << task << " because it's done";
-              removeTask(task, TRR_TASK_ENDED);
-            }
-          }
-        } else {
-          LOG(ERROR) << "S2M_STATUS_UPDATE error: couldn't lookup framework id "
-                     << fid;
-        }
-      } else {
         LOG(ERROR) << "S2M_STATUS_UPDATE error: couldn't lookup slave id "
                    << sid;
       }
-     break;
+      break;
     }
 
     case S2M_FRAMEWORK_MESSAGE: {

Modified: incubator/mesos/trunk/src/messaging/messages.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messaging/messages.hpp?rev=1132198&r1=1132197&r2=1132198&view=diff
==============================================================================
--- incubator/mesos/trunk/src/messaging/messages.hpp (original)
+++ incubator/mesos/trunk/src/messaging/messages.hpp Sun Jun  5 08:57:40 2011
@@ -47,7 +47,6 @@ enum MessageType {
   M2F_SLOT_OFFER,
   M2F_RESCIND_OFFER,
   M2F_STATUS_UPDATE,
-  M2F_FT_STATUS_UPDATE,
   M2F_LOST_SLAVE,
   M2F_FRAMEWORK_MESSAGE,
   M2F_ERROR,
@@ -57,7 +56,6 @@ enum MessageType {
   S2M_REREGISTER_SLAVE,
   S2M_UNREGISTER_SLAVE,
   S2M_STATUS_UPDATE,
-  S2M_FT_STATUS_UPDATE,
   S2M_FRAMEWORK_MESSAGE,
   S2M_LOST_EXECUTOR,
 
@@ -168,6 +166,13 @@ protected:
   }
 
   template <MSGID ID>
+  bool forward(const PID &to, const tuple<ID> &t)
+  {
+    const std::string &data = MESOS_MESSAGING_VERSION + "|" + std::string(t);
+    ReliableProcess::forward(to, ID, data.data(), data.size());
+  }
+
+  template <MSGID ID>
   int rsend(const PID &to, const tuple<ID> &t)
   {
     const std::string &data = MESOS_MESSAGING_VERSION + "|" + std::string(t);
@@ -265,11 +270,6 @@ TUPLE(M2F_STATUS_UPDATE,
        TaskState,
        std::string));
 
-TUPLE(M2F_FT_STATUS_UPDATE,
-      (TaskID,
-       TaskState,
-       std::string));
-
 TUPLE(M2F_LOST_SLAVE,
       (SlaveID));
 
@@ -303,13 +303,6 @@ TUPLE(S2M_STATUS_UPDATE,
        TaskState,
        std::string));
 
-TUPLE(S2M_FT_STATUS_UPDATE,
-      (SlaveID,
-       FrameworkID,
-       TaskID,
-       TaskState,
-       std::string));
-
 TUPLE(S2M_FRAMEWORK_MESSAGE,
       (SlaveID,
        FrameworkID,

Modified: incubator/mesos/trunk/src/sched/sched.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/sched/sched.cpp?rev=1132198&r1=1132197&r2=1132198&view=diff
==============================================================================
--- incubator/mesos/trunk/src/sched/sched.cpp (original)
+++ incubator/mesos/trunk/src/sched/sched.cpp Sun Jun  5 08:57:40 2011
@@ -225,20 +225,12 @@ protected:
         break;
       }
 
-	// TODO(benh): Fix forwarding issues.
-//       case M2F_FT_STATUS_UPDATE: {
-//         TaskID tid;
-//         TaskState state;
-//         string data;
-//         unpack<M2F_FT_STATUS_UPDATE>(tid, state, data);
-      case S2M_FT_STATUS_UPDATE: {
-	SlaveID sid;
-	FrameworkID fid;
-	TaskID tid;
-	TaskState state;
-	string data;
+      case M2F_STATUS_UPDATE: {
+        TaskID tid;
+        TaskState state;
+        string data;
 
-	tie(sid, fid, tid, state, data) = unpack<S2M_FT_STATUS_UPDATE>(body());
+        tie(tid, state, data) = unpack<M2F_STATUS_UPDATE>(body());
 
         if (duplicate()) {
           VLOG(1) << "Received a duplicate status update for tid " << tid
@@ -262,26 +254,6 @@ protected:
         break;
       }
 
-      case M2F_STATUS_UPDATE: {
-        TaskID tid;
-        TaskState state;
-        string data;
-        tie(tid, state, data) = unpack<M2F_STATUS_UPDATE>(body());
-
-        // Stop any status update timers we might have had running.
-        if (timers.count(tid) > 0) {
-          StatusUpdateTimer* timer = timers[tid];
-          timers.erase(tid);
-          send(timer->self(), MESOS_MSGID);
-          wait(timer->self());
-          delete timer;
-        }
-
-        TaskStatus status(tid, state, data);
-        invoke(bind(&Scheduler::statusUpdate, sched, driver, ref(status)));
-        break;
-      }
-
       case M2F_FRAMEWORK_MESSAGE: {
         FrameworkMessage msg;
         tie(msg) = unpack<M2F_FRAMEWORK_MESSAGE>(body());

Modified: incubator/mesos/trunk/src/slave/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.cpp?rev=1132198&r1=1132197&r2=1132198&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave/slave.cpp Sun Jun  5 08:57:40 2011
@@ -369,8 +369,8 @@ void Slave::operator () ()
 	  // Reliably send message and save sequence number for
 	  // canceling later.
 	  int seq = rsend(master, framework->pid,
-			  pack<S2M_FT_STATUS_UPDATE>(id, fid, tid,
-						     taskState, data));
+			  pack<S2M_STATUS_UPDATE>(id, fid, tid,
+                                                  taskState, data));
 	  seqs[fid].insert(seq);
 	} else {
 	  LOG(WARNING) << "Got status update for UNKNOWN task "

Modified: incubator/mesos/trunk/src/tests/master_test.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/master_test.cpp?rev=1132198&r1=1132197&r2=1132198&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/master_test.cpp (original)
+++ incubator/mesos/trunk/src/tests/master_test.cpp Sun Jun  5 08:57:40 2011
@@ -623,7 +623,7 @@ TEST(MasterTest, SchedulerFailoverStatus
   EXPECT_CALL(sched1, error(&driver1, _, "Framework failover"))
     .Times(1);
 
-  EXPECT_MSG(filter, Eq(S2M_FT_STATUS_UPDATE), _, Ne(master))
+  EXPECT_MSG(filter, Eq(M2F_STATUS_UPDATE), _, Ne(master))
     .WillOnce(DoAll(Trigger(&statusUpdateMsg), Return(true)))
     .RetiresOnSaturation();
 

Modified: incubator/mesos/trunk/third_party/libprocess/reliable.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/reliable.cpp?rev=1132198&r1=1132197&r2=1132198&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/reliable.cpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/reliable.cpp Sun Jun  5 08:57:40 2011
@@ -154,10 +154,10 @@ void ReliableProcess::ack()
 }
 
 
-bool ReliableProcess::forward(const PID &via)
+bool ReliableProcess::forward(const PID &to)
 {
   if (current != NULL) {
-    send(via, RELIABLE_MSG, (char *) current,
+    send(to, RELIABLE_MSG, (char *) current,
 	 sizeof(struct rmsg) + current->msg.len);
     return true;
   }
@@ -166,6 +166,37 @@ bool ReliableProcess::forward(const PID 
 }
 
 
+bool ReliableProcess::forward(const PID &to, MSGID id, const char *data, size_t length)
+{
+  if (current != NULL) {
+    struct rmsg *rmsg = (struct rmsg *) malloc(sizeof(struct rmsg) + length);
+
+    rmsg->seq = current->seq;
+
+    rmsg->msg.from.pipe = current->msg.from.pipe;
+    rmsg->msg.from.ip = current->msg.from.ip;
+    rmsg->msg.from.port = current->msg.from.port;
+    rmsg->msg.to.pipe = to.pipe;
+    rmsg->msg.to.ip = to.ip;
+    rmsg->msg.to.port = to.port;
+    rmsg->msg.id = id;
+    rmsg->msg.len = length;
+
+    if (length > 0)
+      memcpy((char *) rmsg + sizeof(struct rmsg), data, length);
+
+    send(to, RELIABLE_MSG, (char *) rmsg,
+	 sizeof(struct rmsg) + rmsg->msg.len);
+
+    free(rmsg);
+
+    return true;
+  }
+
+  return false;
+}
+
+
 int ReliableProcess::rsend(const PID &to, MSGID id, const char *data, size_t length)
 {
   // Allocate/Initialize outgoing message.

Modified: incubator/mesos/trunk/third_party/libprocess/reliable.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/reliable.hpp?rev=1132198&r1=1132197&r2=1132198&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/reliable.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/reliable.hpp Sun Jun  5 08:57:40 2011
@@ -62,29 +62,22 @@ protected:
 
   /**
    * Forward current message (provided it is _reliable_).
-   * @param via hop (or possibly destination)
+   * @param to hop (or possibly destination)
    * @return false if the current message is not _reliable_, true
    * otherwise.
    */
-  virtual bool forward(const PID &via);
+  virtual bool forward(const PID &to);
 
   /**
-   * Sends a _reliable_ message to PID.
-   * @param to destination
-   * @param id message id
-   * @return sequence number of message
-   */
-  virtual int rsend(const PID &to, MSGID id);
-
-  /**
-   * Sends a _reliable_ message via another PID (meant to be
-   * forwarded).
-   * @param via hop
-   * @param to destination
-   * @param id message id
-   * @return sequence number of message
+   * Transform current message with specified id and data and forward
+   * it (provided it is _reliable_). This effectively allows an
+   * intermediate receiver that shouldn't be responsible for the
+   * acknowledgement to change the body of the message as it needs.
+   * @param to hop (or possibly destination)
+   * @return false if the current message is not _reliable_, true
+   * otherwise.
    */
-  virtual int rsend(const PID &via, const PID &to, MSGID id);
+  virtual bool forward(const PID &to, MSGID id, const char *data, size_t length);
 
   /**
    * Sends a _reliable_ message with data to PID.
@@ -94,7 +87,7 @@ protected:
    * @param length payload length
    * @return sequence number of message
    */
-  virtual int rsend(const PID &to, MSGID id, const char *data, size_t length);
+  virtual int rsend(const PID &to, MSGID id, const char *data = NULL, size_t length = 0);
 
   /**
    * Sends a _reliable_ message with data via another process (meant
@@ -106,14 +99,10 @@ protected:
    * @param length payload length
    * @return sequence number of message
    */
-  virtual int rsend(const PID &via, const PID &to, MSGID id, const char *data, size_t length);
+  virtual int rsend(const PID &via, const PID &to, MSGID id, const char *data = NULL, size_t length = 0);
 
-
-  /* Blocks for message indefinitely. */
-  virtual MSGID receive();
-
-  /* Blocks for message at most specified seconds. */
-  virtual MSGID receive(double secs);
+  /* Blocks for message at most specified seconds (0 implies forever). */
+  virtual MSGID receive(double secs = 0);
 
   /**
    * Redirect unacknolwedged messages to be sent to a different PID.
@@ -137,22 +126,4 @@ private:
 };
 
 
-inline int ReliableProcess::rsend(const PID &to, MSGID id)
-{
-  return rsend(to, id, NULL, 0);
-}
-
-
-inline int ReliableProcess::rsend(const PID &via, const PID &to, MSGID id)
-{
-  return rsend(via, to, id, NULL, 0);
-}
-
-
-inline MSGID ReliableProcess::receive()
-{
-  return receive(0);
-}
-
-
 #endif /* __RELIABLE_HPP__ */