You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 07:36:10 UTC

svn commit: r1131811 - in /incubator/mesos/trunk/src: master.cpp nexus_sched.cpp

Author: benh
Date: Sun Jun  5 05:36:10 2011
New Revision: 1131811

URL: http://svn.apache.org/viewvc?rev=1131811&view=rev
Log:
Fixing regressions introduced from porting to ReliableProcess.

Modified:
    incubator/mesos/trunk/src/master.cpp
    incubator/mesos/trunk/src/nexus_sched.cpp

Modified: incubator/mesos/trunk/src/master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master.cpp?rev=1131811&r1=1131810&r2=1131811&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master.cpp (original)
+++ incubator/mesos/trunk/src/master.cpp Sun Jun  5 05:36:10 2011
@@ -633,8 +633,8 @@ void Master::operator () ()
         FrameworkID fid = pidToFid[from()];
         if (Framework *framework = lookupFramework(fid)) {
           LOG(INFO) << framework << " disconnected";
-	  // TODO(benh): Only wait a specified timeout for another
-	  // scheduler for this framework to reconnect.
+	  // TODO(benh): Wait for a framework failover.
+	  removeFramework(framework);
         }
       } else if (pidToSid.find(from()) != pidToSid.end()) {
         SlaveID sid = pidToSid[from()];

Modified: incubator/mesos/trunk/src/nexus_sched.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/nexus_sched.cpp?rev=1131811&r1=1131810&r2=1131811&view=diff
==============================================================================
--- incubator/mesos/trunk/src/nexus_sched.cpp (original)
+++ incubator/mesos/trunk/src/nexus_sched.cpp Sun Jun  5 05:36:10 2011
@@ -69,8 +69,7 @@ protected:
       case PROCESS_TIMEOUT: {
         terminate = true;
         DLOG(INFO) << "FT: faking M2F_STATUS_UPDATE due to ReplyToOffer timeout for tid:" << tid;
-        send(parent, 
-             pack<M2F_STATUS_UPDATE>(tid, TASK_LOST, ""));
+        send(parent, pack<M2F_STATUS_UPDATE>(tid, TASK_LOST, ""));
         break;
       }
 
@@ -237,7 +236,8 @@ protected:
 	foreach(const TaskDescription &task, tasks) {
 	  RbReply *rr = new RbReply(self(), task.taskId);
 	  rbReplies[task.taskId] = rr;
-	  link(spawn(rr));
+	  // TODO(benh): Link?
+	  spawn(rr);
 	}
         
         send(master, pack<F2M_SLOT_OFFER_REPLY>(fid, oid, tasks, params));
@@ -259,23 +259,34 @@ protected:
         break;
       }
 
-      case M2F_FT_STATUS_UPDATE: {
-        TaskID tid;
-        TaskState state;
-        string data;
-        unpack<M2F_FT_STATUS_UPDATE>(tid, state, data);
+	// TODO(benh): Fix forwarding issues.
+//       case M2F_FT_STATUS_UPDATE: {
+//         TaskID tid;
+//         TaskState state;
+//         string data;
+//         unpack<M2F_FT_STATUS_UPDATE>(tid, state, data);
+      case S2M_FT_STATUS_UPDATE: {
+	SlaveID sid;
+	FrameworkID fid;
+	TaskID tid;
+	TaskState state;
+	string data;
+
+	unpack<S2M_FT_STATUS_UPDATE>(sid, fid, tid, state, data);
+
         if (duplicate())
           break;
         ack();
         DLOG(INFO) << "FT: Received message with id: " << seq();
 
-        if (state == TASK_RUNNING) {
-          unordered_map <TaskID, RbReply *>::iterator it = rbReplies.find(tid);
-          if (it != rbReplies.end()) {
-            send(it->second->getPID(), pack<F2F_TASK_RUNNING_STATUS>());
-            rbReplies.erase(tid);
-          }
-        }
+	unordered_map <TaskID, RbReply *>::iterator it = rbReplies.find(tid);
+	if (it != rbReplies.end()) {
+	  RbReply *rr = it->second;
+	  send(rr->getPID(), pack<F2F_TASK_RUNNING_STATUS>());
+	  wait(rr->getPID());
+	  rbReplies.erase(tid);
+	  delete rr;
+	}
 
         TaskStatus status(tid, state, data);
         invoke(bind(&Scheduler::statusUpdate, sched, driver, ref(status)));
@@ -287,13 +298,17 @@ protected:
         TaskState state;
         string data;
         unpack<M2F_STATUS_UPDATE>(tid, state, data);
-        TaskStatus status(tid, state, data);
 
-        unordered_map <TaskID, RbReply *>::iterator it = rbReplies.find(tid);
-        if (it != rbReplies.end() && it->second->getPID() == from()) { // clean rbReplies
-          rbReplies.erase(tid);
-        }
+	unordered_map <TaskID, RbReply *>::iterator it = rbReplies.find(tid);
+	if (it != rbReplies.end()) {
+	  RbReply *rr = it->second;
+	  send(rr->getPID(), pack<F2F_TASK_RUNNING_STATUS>());
+	  wait(rr->getPID());
+	  rbReplies.erase(tid);
+	  delete rr;
+	}
 
+        TaskStatus status(tid, state, data);
         invoke(bind(&Scheduler::statusUpdate, sched, driver, ref(status)));
         break;
       }