You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 07:38:15 UTC
svn commit: r1131829 - in /incubator/mesos/trunk/src: master.cpp master.hpp
tests/test_master.cpp
Author: benh
Date: Sun Jun 5 05:38:15 2011
New Revision: 1131829
URL: http://svn.apache.org/viewvc?rev=1131829&view=rev
Log:
Fixing bug in scheduler failover, led to some refactoring to share code.
Modified:
incubator/mesos/trunk/src/master.cpp
incubator/mesos/trunk/src/master.hpp
incubator/mesos/trunk/src/tests/test_master.cpp
Modified: incubator/mesos/trunk/src/master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master.cpp?rev=1131829&r1=1131828&r2=1131829&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master.cpp (original)
+++ incubator/mesos/trunk/src/master.cpp Sun Jun 5 05:38:15 2011
@@ -298,13 +298,7 @@ void Master::operator () ()
framework->user,
framework->executorInfo);
LOG(INFO) << "Registering " << framework << " at " << framework->pid;
- frameworks[fid] = framework;
- pidToFid[framework->pid] = fid;
- link(framework->pid);
- send(framework->pid, pack<M2F_REGISTER_REPLY>(fid));
- allocator->frameworkAdded(framework);
- if (framework->executorInfo.uri == "")
- terminateFramework(framework, 1, "No executor URI given");
+ addFramework(framework);
break;
}
@@ -320,31 +314,25 @@ void Master::operator () ()
if (framework->id == "") {
LOG(ERROR) << "Framework reconnect/failover without an id!";
send(framework->pid, pack<M2F_ERROR>(1, "Missing framework id"));
+ delete framework;
break;
}
LOG(INFO) << "Reregistering " << framework << " at " << framework->pid;
- if (frameworks[framework->id] != NULL) {
+ if (frameworks.find(framework->id) != frameworks.end()) {
if (failover) {
- terminateFramework(frameworks[framework->id], 1, "Failover");
+ replaceFramework(frameworks[framework->id], framework);
} else {
LOG(INFO) << "Framework reregistering with an already used id!";
send(framework->pid, pack<M2F_ERROR>(1, "Framework id in use"));
+ delete framework;
break;
}
+ } else {
+ addFramework(framework);
+ updateFrameworkTasks();
}
-
- frameworks[framework->id] = framework;
- pidToFid[framework->pid] = framework->id;
-
- updateFrameworkTasks();
-
- link(framework->pid);
- send(framework->pid, pack<M2F_REGISTER_REPLY>(framework->id));
- allocator->frameworkAdded(framework);
- if (framework->executorInfo.uri == "")
- terminateFramework(framework, 1, "No executor URI given");
break;
}
@@ -856,6 +844,51 @@ void Master::removeSlotOffer(SlotOffer *
}
+
+void Master::addFramework(Framework *framework)
+{
+ CHECK(frameworks.find(framework->id) == frameworks.end());
+ frameworks[framework->id] = framework;
+ pidToFid[framework->pid] = framework->id;
+ link(framework->pid);
+ send(framework->pid, pack<M2F_REGISTER_REPLY>(framework->id));
+ allocator->frameworkAdded(framework);
+ if (framework->executorInfo.uri == "")
+ terminateFramework(framework, 1, "No executor URI given");
+}
+
+
+void Master::replaceFramework(Framework *old, Framework *current)
+{
+ CHECK(old->id == current->id);
+
+ old->active = false;
+ // TODO: Notify allocator that a framework removal is beginning?
+
+ // Remove the framework's slot offers
+ unordered_set<SlotOffer *> slotOffersCopy = old->slotOffers;
+ foreach (SlotOffer* offer, slotOffersCopy) {
+ removeSlotOffer(offer, ORR_FRAMEWORK_FAILOVER, offer->resources);
+ }
+
+ send(old->pid, pack<M2F_ERROR>(1, "Framework failover"));
+
+ // TODO(benh): Similar code between removeFramework and
+ // replaceFramework needs to be shared!
+
+ // TODO(benh): unlink(old->pid);
+ pidToFid.erase(old->pid);
+
+ // Delete it
+ frameworks.erase(old->id);
+ allocator->frameworkRemoved(old);
+ delete old;
+
+ addFramework(current);
+ updateFrameworkTasks();
+}
+
+
// Kill all of a framework's tasks, delete the framework object, and
// reschedule slot offers for slots that were assigned to this framework
void Master::removeFramework(Framework *framework)
@@ -881,6 +914,9 @@ void Master::removeFramework(Framework *
removeSlotOffer(offer, ORR_FRAMEWORK_LOST, offer->resources);
}
+ // TODO(benh): Similar code between removeFramework and
+ // replaceFramework needs to be shared!
+
// TODO(benh): unlink(framework->pid);
pidToFid.erase(framework->pid);
Modified: incubator/mesos/trunk/src/master.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master.hpp?rev=1131829&r1=1131828&r2=1131829&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master.hpp (original)
+++ incubator/mesos/trunk/src/master.hpp Sun Jun 5 05:38:15 2011
@@ -246,6 +246,7 @@ enum OfferReturnReason
ORR_FRAMEWORK_REPLIED,
ORR_OFFER_RESCINDED,
ORR_FRAMEWORK_LOST,
+ ORR_FRAMEWORK_FAILOVER,
ORR_SLAVE_LOST
};
@@ -342,6 +343,10 @@ protected:
void removeTask(Task *task, TaskRemovalReason reason);
+ void addFramework(Framework *framework);
+
+ void replaceFramework(Framework *old, Framework *current);
+
// Kill all of a framework's tasks, delete the framework object, and
// reschedule slot offers for slots that were assigned to this framework
void removeFramework(Framework *framework);
Modified: incubator/mesos/trunk/src/tests/test_master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/test_master.cpp?rev=1131829&r1=1131828&r2=1131829&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/test_master.cpp (original)
+++ incubator/mesos/trunk/src/tests/test_master.cpp Sun Jun 5 05:38:15 2011
@@ -413,7 +413,7 @@ TEST(MasterTest, SchedulerFailover)
NexusSchedulerDriver driver(&failingSched, master);
driver.run();
- EXPECT_EQ("Failover", failingSched.errorMessage);
+ EXPECT_EQ("Framework failover", failingSched.errorMessage);
failingSched.driver->join();