You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 07:36:10 UTC
svn commit: r1131811 - in /incubator/mesos/trunk/src: master.cpp
nexus_sched.cpp
Author: benh
Date: Sun Jun 5 05:36:10 2011
New Revision: 1131811
URL: http://svn.apache.org/viewvc?rev=1131811&view=rev
Log:
Fixing regressions introduced from porting to ReliableProcess.
Modified:
incubator/mesos/trunk/src/master.cpp
incubator/mesos/trunk/src/nexus_sched.cpp
Modified: incubator/mesos/trunk/src/master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master.cpp?rev=1131811&r1=1131810&r2=1131811&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master.cpp (original)
+++ incubator/mesos/trunk/src/master.cpp Sun Jun 5 05:36:10 2011
@@ -633,8 +633,8 @@ void Master::operator () ()
FrameworkID fid = pidToFid[from()];
if (Framework *framework = lookupFramework(fid)) {
LOG(INFO) << framework << " disconnected";
- // TODO(benh): Only wait a specified timeout for another
- // scheduler for this framework to reconnect.
+ // TODO(benh): Wait for a framework failover.
+ removeFramework(framework);
}
} else if (pidToSid.find(from()) != pidToSid.end()) {
SlaveID sid = pidToSid[from()];
Modified: incubator/mesos/trunk/src/nexus_sched.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/nexus_sched.cpp?rev=1131811&r1=1131810&r2=1131811&view=diff
==============================================================================
--- incubator/mesos/trunk/src/nexus_sched.cpp (original)
+++ incubator/mesos/trunk/src/nexus_sched.cpp Sun Jun 5 05:36:10 2011
@@ -69,8 +69,7 @@ protected:
case PROCESS_TIMEOUT: {
terminate = true;
DLOG(INFO) << "FT: faking M2F_STATUS_UPDATE due to ReplyToOffer timeout for tid:" << tid;
- send(parent,
- pack<M2F_STATUS_UPDATE>(tid, TASK_LOST, ""));
+ send(parent, pack<M2F_STATUS_UPDATE>(tid, TASK_LOST, ""));
break;
}
@@ -237,7 +236,8 @@ protected:
foreach(const TaskDescription &task, tasks) {
RbReply *rr = new RbReply(self(), task.taskId);
rbReplies[task.taskId] = rr;
- link(spawn(rr));
+ // TODO(benh): Link?
+ spawn(rr);
}
send(master, pack<F2M_SLOT_OFFER_REPLY>(fid, oid, tasks, params));
@@ -259,23 +259,34 @@ protected:
break;
}
- case M2F_FT_STATUS_UPDATE: {
- TaskID tid;
- TaskState state;
- string data;
- unpack<M2F_FT_STATUS_UPDATE>(tid, state, data);
+ // TODO(benh): Fix forwarding issues.
+// case M2F_FT_STATUS_UPDATE: {
+// TaskID tid;
+// TaskState state;
+// string data;
+// unpack<M2F_FT_STATUS_UPDATE>(tid, state, data);
+ case S2M_FT_STATUS_UPDATE: {
+ SlaveID sid;
+ FrameworkID fid;
+ TaskID tid;
+ TaskState state;
+ string data;
+
+ unpack<S2M_FT_STATUS_UPDATE>(sid, fid, tid, state, data);
+
if (duplicate())
break;
ack();
DLOG(INFO) << "FT: Received message with id: " << seq();
- if (state == TASK_RUNNING) {
- unordered_map <TaskID, RbReply *>::iterator it = rbReplies.find(tid);
- if (it != rbReplies.end()) {
- send(it->second->getPID(), pack<F2F_TASK_RUNNING_STATUS>());
- rbReplies.erase(tid);
- }
- }
+ unordered_map <TaskID, RbReply *>::iterator it = rbReplies.find(tid);
+ if (it != rbReplies.end()) {
+ RbReply *rr = it->second;
+ send(rr->getPID(), pack<F2F_TASK_RUNNING_STATUS>());
+ wait(rr->getPID());
+ rbReplies.erase(tid);
+ delete rr;
+ }
TaskStatus status(tid, state, data);
invoke(bind(&Scheduler::statusUpdate, sched, driver, ref(status)));
@@ -287,13 +298,17 @@ protected:
TaskState state;
string data;
unpack<M2F_STATUS_UPDATE>(tid, state, data);
- TaskStatus status(tid, state, data);
- unordered_map <TaskID, RbReply *>::iterator it = rbReplies.find(tid);
- if (it != rbReplies.end() && it->second->getPID() == from()) { // clean rbReplies
- rbReplies.erase(tid);
- }
+ unordered_map <TaskID, RbReply *>::iterator it = rbReplies.find(tid);
+ if (it != rbReplies.end()) {
+ RbReply *rr = it->second;
+ send(rr->getPID(), pack<F2F_TASK_RUNNING_STATUS>());
+ wait(rr->getPID());
+ rbReplies.erase(tid);
+ delete rr;
+ }
+ TaskStatus status(tid, state, data);
invoke(bind(&Scheduler::statusUpdate, sched, driver, ref(status)));
break;
}