You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2014/10/08 20:45:54 UTC

[7/8] git commit: Removed the need for Master::readdSlave.

Removed the need for Master::readdSlave.

Review: https://reviews.apache.org/r/26204


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a4a0d158
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a4a0d158
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a4a0d158

Branch: refs/heads/master
Commit: a4a0d1580815360083a46b43d458c5babd58c632
Parents: ee4f879
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Mon Sep 29 14:07:40 2014 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Oct 8 11:45:12 2014 -0700

----------------------------------------------------------------------
 src/master/master.cpp | 105 +++++++++++++++++++--------------------------
 src/master/master.hpp |  24 ++++++++---
 2 files changed, 60 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a4a0d158/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index e445c86..26cd29a 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -2993,11 +2993,16 @@ void Master::_registerSlave(
         version.empty() ? Option<string>::none() : version,
         Clock::now());
 
-    LOG(INFO) << "Registered slave " << *slave
-              << " with " << slave->info.resources();
     ++metrics.slave_registrations;
 
     addSlave(slave);
+
+    SlaveRegisteredMessage message;
+    message.mutable_slave_id()->MergeFrom(slave->id);
+    send(slave->pid, message);
+
+    LOG(INFO) << "Registered slave " << *slave
+              << " with " << slave->info.resources();
   }
 }
 
@@ -3188,15 +3193,22 @@ void Master::_reregisterSlave(
         slaveInfo,
         pid,
         version.empty() ? Option<string>::none() : version,
-        Clock::now());
+        Clock::now(),
+        executorInfos,
+        tasks);
 
     slave->reregisteredTime = Clock::now();
 
-    LOG(INFO) << "Re-registered slave " << *slave
-              << " with " << slave->info.resources();
     ++metrics.slave_reregistrations;
 
-    readdSlave(slave, executorInfos, tasks, completedFrameworks);
+    addSlave(slave, completedFrameworks);
+
+    SlaveReregisteredMessage message;
+    message.mutable_slave_id()->MergeFrom(slave->id);
+    send(slave->pid, message);
+
+    LOG(INFO) << "Re-registered slave " << *slave
+              << " with " << slave->info.resources();
 
     __reregisterSlave(slave, tasks);
   }
@@ -4214,7 +4226,9 @@ void Master::removeFramework(Slave* slave, Framework* framework)
 }
 
 
-void Master::addSlave(Slave* slave, bool reregister)
+void Master::addSlave(
+    Slave* slave,
+    const vector<Archive::Framework>& completedFrameworks)
 {
   CHECK_NOTNULL(slave);
 
@@ -4223,70 +4237,37 @@ void Master::addSlave(Slave* slave, bool reregister)
 
   link(slave->pid);
 
-  if (!reregister) {
-    SlaveRegisteredMessage message;
-    message.mutable_slave_id()->MergeFrom(slave->id);
-    send(slave->pid, message);
-  } else {
-    SlaveReregisteredMessage message;
-    message.mutable_slave_id()->MergeFrom(slave->id);
-    send(slave->pid, message);
-  }
-
   // Set up an observer for the slave.
   slave->observer = new SlaveObserver(
       slave->pid, slave->info, slave->id, self());
 
   spawn(slave->observer);
 
-  if (!reregister) {
-    allocator->slaveAdded(slave->id, slave->info, slave->usedResources);
-  }
-}
-
-
-void Master::readdSlave(
-    Slave* slave,
-    const vector<ExecutorInfo>& executorInfos,
-    const vector<Task>& tasks,
-    const vector<Archive::Framework>& completedFrameworks)
-{
-  CHECK_NOTNULL(slave);
-
-  addSlave(slave, true);
-
-  foreach (const ExecutorInfo& executorInfo, executorInfos) {
-    // TODO(bmahler): ExecutorInfo.framework_id is set by the Scheduler
-    // Driver in 0.14.0. Therefore, in 0.15.0, the slave no longer needs
-    // to set it, and we could remove this CHECK if desired.
-    CHECK(executorInfo.has_framework_id())
-      << "Executor " << executorInfo.executor_id()
-      << " doesn't have frameworkId set";
-
-    slave->addExecutor(executorInfo.framework_id(), executorInfo);
-
-    Framework* framework = getFramework(executorInfo.framework_id());
-    if (framework != NULL) { // The framework might not be re-registered yet.
-      framework->addExecutor(slave->id, executorInfo);
+  // Add the slave's executors to the frameworks.
+  foreachkey (const FrameworkID& frameworkId, slave->executors) {
+    foreachvalue (const ExecutorInfo& executorInfo,
+                  slave->executors[frameworkId]) {
+      Framework* framework = getFramework(frameworkId);
+      if (framework != NULL) { // The framework might not be re-registered yet.
+        framework->addExecutor(slave->id, executorInfo);
+      }
     }
   }
 
-  foreach (const Task& task, tasks) {
-    Task* t = new Task(task);
-
-    // Add the task to the slave.
-    slave->addTask(t);
-
-    Framework* framework = getFramework(task.framework_id());
-    if (framework != NULL) { // The framework might not be re-registered yet.
-      framework->addTask(t);
-    } else {
-      // TODO(benh): We should really put a timeout on how long we
-      // keep tasks running on a slave that never have frameworks
-      // reregister and claim them.
-      LOG(WARNING) << "Possibly orphaned task " << task.task_id()
-                   << " of framework " << task.framework_id()
-                   << " running on slave " << *slave;
+  // Add the slave's tasks to the frameworks.
+  foreachkey (const FrameworkID& frameworkId, slave->tasks) {
+    foreachvalue (Task* task, slave->tasks[frameworkId]) {
+      Framework* framework = getFramework(task->framework_id());
+      if (framework != NULL) { // The framework might not be re-registered yet.
+        framework->addTask(task);
+      } else {
+        // TODO(benh): We should really put a timeout on how long we
+        // keep tasks running on a slave that never have frameworks
+        // reregister and claim them.
+        LOG(WARNING) << "Possibly orphaned task " << task->task_id()
+                     << " of framework " << task->framework_id()
+                     << " running on slave " << *slave;
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/a4a0d158/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index e97d213..37ce31a 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -335,13 +335,10 @@ protected:
   void deactivate(Slave* slave);
 
   // Add a slave.
-  void addSlave(Slave* slave, bool reregister = false);
-
-  void readdSlave(
+  void addSlave(
       Slave* slave,
-      const std::vector<ExecutorInfo>& executorInfos,
-      const std::vector<Task>& tasks,
-      const std::vector<Archive::Framework>& completedFrameworks);
+      const std::vector<Archive::Framework>& completedFrameworks =
+        std::vector<Archive::Framework>());
 
   // Remove the slave from the registrar and from the master's state.
   void removeSlave(Slave* slave);
@@ -820,7 +817,11 @@ struct Slave
   Slave(const SlaveInfo& _info,
         const process::UPID& _pid,
         const Option<std::string> _version,
-        const process::Time& _registeredTime)
+        const process::Time& _registeredTime,
+        const std::vector<ExecutorInfo> executorInfos =
+          std::vector<ExecutorInfo>(),
+        const std::vector<Task> tasks =
+          std::vector<Task>())
     : id(_info.id()),
       info(_info),
       pid(_pid),
@@ -831,6 +832,15 @@ struct Slave
       observer(NULL)
   {
     CHECK(_info.has_id());
+
+    foreach (const ExecutorInfo& executorInfo, executorInfos) {
+      CHECK(executorInfo.has_framework_id());
+      addExecutor(executorInfo.framework_id(), executorInfo);
+    }
+
+    foreach (const Task& task, tasks) {
+      addTask(new Task(task));
+    }
   }
 
   ~Slave() {}