You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 07:38:15 UTC

svn commit: r1131829 - in /incubator/mesos/trunk/src: master.cpp master.hpp tests/test_master.cpp

Author: benh
Date: Sun Jun  5 05:38:15 2011
New Revision: 1131829

URL: http://svn.apache.org/viewvc?rev=1131829&view=rev
Log:
Fixing bug in scheduler failover, led to some refactoring to share code.

Modified:
    incubator/mesos/trunk/src/master.cpp
    incubator/mesos/trunk/src/master.hpp
    incubator/mesos/trunk/src/tests/test_master.cpp

Modified: incubator/mesos/trunk/src/master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master.cpp?rev=1131829&r1=1131828&r2=1131829&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master.cpp (original)
+++ incubator/mesos/trunk/src/master.cpp Sun Jun  5 05:38:15 2011
@@ -298,13 +298,7 @@ void Master::operator () ()
 				     framework->user,
 				     framework->executorInfo);
       LOG(INFO) << "Registering " << framework << " at " << framework->pid;
-      frameworks[fid] = framework;
-      pidToFid[framework->pid] = fid;
-      link(framework->pid);
-      send(framework->pid, pack<M2F_REGISTER_REPLY>(fid));
-      allocator->frameworkAdded(framework);
-      if (framework->executorInfo.uri == "")
-        terminateFramework(framework, 1, "No executor URI given");
+      addFramework(framework);
       break;
     }
 
@@ -320,31 +314,25 @@ void Master::operator () ()
       if (framework->id == "") {
 	LOG(ERROR) << "Framework reconnect/failover without an id!";
 	send(framework->pid, pack<M2F_ERROR>(1, "Missing framework id"));
+	delete framework;
 	break;
       }
 
       LOG(INFO) << "Reregistering " << framework << " at " << framework->pid;
 
-      if (frameworks[framework->id] != NULL) {
+      if (frameworks.find(framework->id) != frameworks.end()) {
 	if (failover) {
-	  terminateFramework(frameworks[framework->id], 1, "Failover");
+	  replaceFramework(frameworks[framework->id], framework);
 	} else {
 	  LOG(INFO) << "Framework reregistering with an already used id!";
 	  send(framework->pid, pack<M2F_ERROR>(1, "Framework id in use"));
+	  delete framework;
 	  break;
 	}
+      } else {
+	addFramework(framework);
+	updateFrameworkTasks();
       }
-
-      frameworks[framework->id] = framework;
-      pidToFid[framework->pid] = framework->id;
-
-      updateFrameworkTasks();
-
-      link(framework->pid);
-      send(framework->pid, pack<M2F_REGISTER_REPLY>(framework->id));
-      allocator->frameworkAdded(framework);
-      if (framework->executorInfo.uri == "")
-        terminateFramework(framework, 1, "No executor URI given");
       break;
     }
 
@@ -856,6 +844,51 @@ void Master::removeSlotOffer(SlotOffer *
 }
 
 
+
+void Master::addFramework(Framework *framework)
+{
+  CHECK(frameworks.find(framework->id) == frameworks.end());
+  frameworks[framework->id] = framework;
+  pidToFid[framework->pid] = framework->id;
+  link(framework->pid);
+  send(framework->pid, pack<M2F_REGISTER_REPLY>(framework->id));
+  allocator->frameworkAdded(framework);
+  if (framework->executorInfo.uri == "")
+    terminateFramework(framework, 1, "No executor URI given");
+}
+
+
+void Master::replaceFramework(Framework *old, Framework *current)
+{
+  CHECK(old->id == current->id);
+
+  old->active = false;
+  // TODO: Notify allocator that a framework removal is beginning?
+  
+  // Remove the framework's slot offers
+  unordered_set<SlotOffer *> slotOffersCopy = old->slotOffers;
+  foreach (SlotOffer* offer, slotOffersCopy) {
+    removeSlotOffer(offer, ORR_FRAMEWORK_FAILOVER, offer->resources);
+  }
+
+  send(old->pid, pack<M2F_ERROR>(1, "Framework failover"));
+
+  // TODO(benh): Similar code between removeFramework and
+  // replaceFramework needs to be shared!
+
+  // TODO(benh): unlink(old->pid);
+  pidToFid.erase(old->pid);
+
+  // Delete it
+  frameworks.erase(old->id);
+  allocator->frameworkRemoved(old);
+  delete old;
+
+  addFramework(current);
+  updateFrameworkTasks();
+}
+
+
 // Kill all of a framework's tasks, delete the framework object, and
 // reschedule slot offers for slots that were assigned to this framework
 void Master::removeFramework(Framework *framework)
@@ -881,6 +914,9 @@ void Master::removeFramework(Framework *
     removeSlotOffer(offer, ORR_FRAMEWORK_LOST, offer->resources);
   }
 
+  // TODO(benh): Similar code between removeFramework and
+  // replaceFramework needs to be shared!
+
   // TODO(benh): unlink(framework->pid);
   pidToFid.erase(framework->pid);
 

Modified: incubator/mesos/trunk/src/master.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master.hpp?rev=1131829&r1=1131828&r2=1131829&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master.hpp (original)
+++ incubator/mesos/trunk/src/master.hpp Sun Jun  5 05:38:15 2011
@@ -246,6 +246,7 @@ enum OfferReturnReason
   ORR_FRAMEWORK_REPLIED,
   ORR_OFFER_RESCINDED,
   ORR_FRAMEWORK_LOST,
+  ORR_FRAMEWORK_FAILOVER,
   ORR_SLAVE_LOST
 };
 
@@ -342,6 +343,10 @@ protected:
 
   void removeTask(Task *task, TaskRemovalReason reason);
 
+  void addFramework(Framework *framework);
+
+  void replaceFramework(Framework *old, Framework *current);
+
   // Kill all of a framework's tasks, delete the framework object, and
   // reschedule slot offers for slots that were assigned to this framework
   void removeFramework(Framework *framework);

Modified: incubator/mesos/trunk/src/tests/test_master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/test_master.cpp?rev=1131829&r1=1131828&r2=1131829&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/test_master.cpp (original)
+++ incubator/mesos/trunk/src/tests/test_master.cpp Sun Jun  5 05:38:15 2011
@@ -413,7 +413,7 @@ TEST(MasterTest, SchedulerFailover)
   NexusSchedulerDriver driver(&failingSched, master);
   driver.run();
 
-  EXPECT_EQ("Failover", failingSched.errorMessage);
+  EXPECT_EQ("Framework failover", failingSched.errorMessage);
 
   failingSched.driver->join();