You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2014/10/08 20:45:48 UTC

[1/8] git commit: Introduced a version during slave (re-)registration.

Repository: mesos
Updated Branches:
  refs/heads/master 7c7f8dd19 -> e4bd79489


Introduced a version during slave (re-)registration.

Review: https://reviews.apache.org/r/26202


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2c46ccca
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2c46ccca
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2c46ccca

Branch: refs/heads/master
Commit: 2c46ccca68841d2f6b475812817225f21630a5e5
Parents: 5da57a7
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Thu Sep 25 18:11:12 2014 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Oct 8 11:45:11 2014 -0700

----------------------------------------------------------------------
 src/master/master.cpp       | 50 ++++++++++++++++++++++++++++------------
 src/master/master.hpp       | 19 +++++++++++----
 src/messages/messages.proto | 10 ++++++++
 src/slave/slave.cpp         |  4 ++++
 4 files changed, 64 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/2c46ccca/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index eb7b210..03881df 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -579,14 +579,16 @@ void Master::initialize()
 
   install<RegisterSlaveMessage>(
       &Master::registerSlave,
-      &RegisterSlaveMessage::slave);
+      &RegisterSlaveMessage::slave,
+      &RegisterSlaveMessage::version);
 
   install<ReregisterSlaveMessage>(
       &Master::reregisterSlave,
       &ReregisterSlaveMessage::slave,
       &ReregisterSlaveMessage::executor_infos,
       &ReregisterSlaveMessage::tasks,
-      &ReregisterSlaveMessage::completed_frameworks);
+      &ReregisterSlaveMessage::completed_frameworks,
+      &ReregisterSlaveMessage::version);
 
   install<UnregisterSlaveMessage>(
       &Master::unregisterSlave,
@@ -2876,7 +2878,10 @@ void Master::schedulerMessage(
 }
 
 
-void Master::registerSlave(const UPID& from, const SlaveInfo& slaveInfo)
+void Master::registerSlave(
+    const UPID& from,
+    const SlaveInfo& slaveInfo,
+    const string& version)
 {
   ++metrics.messages_register_slave;
 
@@ -2885,7 +2890,7 @@ void Master::registerSlave(const UPID& from, const SlaveInfo& slaveInfo)
               << " because authentication is still in progress";
 
     authenticating[from]
-      .onReady(defer(self(), &Self::registerSlave, from, slaveInfo));
+      .onReady(defer(self(), &Self::registerSlave, from, slaveInfo, version));
     return;
   }
 
@@ -2950,6 +2955,7 @@ void Master::registerSlave(const UPID& from, const SlaveInfo& slaveInfo)
                  &Self::_registerSlave,
                  slaveInfo_,
                  from,
+                 version,
                  lambda::_1));
 }
 
@@ -2957,6 +2963,7 @@ void Master::registerSlave(const UPID& from, const SlaveInfo& slaveInfo)
 void Master::_registerSlave(
     const SlaveInfo& slaveInfo,
     const UPID& pid,
+    const string& version,
     const Future<bool>& admit)
 {
   slaves.registering.erase(pid);
@@ -2980,7 +2987,11 @@ void Master::_registerSlave(
         stringify(slaveInfo.id()));
     send(pid, message);
   } else {
-    Slave* slave = new Slave(slaveInfo, pid, Clock::now());
+    Slave* slave = new Slave(
+        slaveInfo,
+        pid,
+        version.empty() ? Option<string>::none() : version,
+        Clock::now());
 
     LOG(INFO) << "Registered slave " << *slave;
     ++metrics.slave_registrations;
@@ -2995,7 +3006,8 @@ void Master::reregisterSlave(
     const SlaveInfo& slaveInfo,
     const vector<ExecutorInfo>& executorInfos,
     const vector<Task>& tasks,
-    const vector<Archive::Framework>& completedFrameworks)
+    const vector<Archive::Framework>& completedFrameworks,
+    const string& version)
 {
   ++metrics.messages_reregister_slave;
 
@@ -3010,7 +3022,8 @@ void Master::reregisterSlave(
                      slaveInfo,
                      executorInfos,
                      tasks,
-                     completedFrameworks));
+                     completedFrameworks,
+                     version));
     return;
   }
 
@@ -3130,13 +3143,14 @@ void Master::reregisterSlave(
   // registrar.
   registrar->apply(Owned<Operation>(new ReadmitSlave(slaveInfo)))
     .onAny(defer(self(),
-           &Self::_reregisterSlave,
-           slaveInfo,
-           from,
-           executorInfos,
-           tasks,
-           completedFrameworks,
-           lambda::_1));
+                 &Self::_reregisterSlave,
+                 slaveInfo,
+                 from,
+                 executorInfos,
+                 tasks,
+                 completedFrameworks,
+                 version,
+                 lambda::_1));
 }
 
 
@@ -3146,6 +3160,7 @@ void Master::_reregisterSlave(
     const vector<ExecutorInfo>& executorInfos,
     const vector<Task>& tasks,
     const vector<Archive::Framework>& completedFrameworks,
+    const string& version,
     const Future<bool>& readmit)
 {
   slaves.reregistering.erase(slaveInfo.id());
@@ -3168,7 +3183,12 @@ void Master::_reregisterSlave(
     send(pid, message);
   } else {
     // Re-admission succeeded.
-    Slave* slave = new Slave(slaveInfo, pid, Clock::now());
+    Slave* slave = new Slave(
+        slaveInfo,
+        pid,
+        version.empty() ? Option<string>::none() : version,
+        Clock::now());
+
     slave->reregisteredTime = Clock::now();
 
     LOG(INFO) << "Re-registered slave " << *slave;

http://git-wip-us.apache.org/repos/asf/mesos/blob/2c46ccca/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 0f0b205..e97d213 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -157,13 +157,15 @@ public:
       const std::string& data);
   void registerSlave(
       const process::UPID& from,
-      const SlaveInfo& slaveInfo);
+      const SlaveInfo& slaveInfo,
+      const std::string& version);
   void reregisterSlave(
       const process::UPID& from,
       const SlaveInfo& slaveInfo,
       const std::vector<ExecutorInfo>& executorInfos,
       const std::vector<Task>& tasks,
-      const std::vector<Archive::Framework>& completedFrameworks);
+      const std::vector<Archive::Framework>& completedFrameworks,
+      const std::string& version);
 
   void unregisterSlave(
       const process::UPID& from,
@@ -227,6 +229,7 @@ public:
       const std::vector<ExecutorInfo>& executorInfos,
       const std::vector<Task>& tasks,
       const std::vector<Archive::Framework>& completedFrameworks,
+      const std::string& version,
       const process::Future<bool>& readmit);
 
   MasterInfo info() const
@@ -267,6 +270,7 @@ protected:
   void _registerSlave(
       const SlaveInfo& slaveInfo,
       const process::UPID& pid,
+      const std::string& version,
       const process::Future<bool>& admit);
 
   void __reregisterSlave(
@@ -815,11 +819,13 @@ struct Slave
 {
   Slave(const SlaveInfo& _info,
         const process::UPID& _pid,
-        const process::Time& time)
+        const Option<std::string> _version,
+        const process::Time& _registeredTime)
     : id(_info.id()),
       info(_info),
       pid(_pid),
-      registeredTime(time),
+      version(_version),
+      registeredTime(_registeredTime),
       connected(true),
       active(true),
       observer(NULL)
@@ -954,6 +960,11 @@ struct Slave
 
   process::UPID pid;
 
+  // The Mesos version of the slave. If set, the slave is >= 0.21.0.
+  // TODO(bmahler): Use stout's Version when it can parse labels, etc.
+  // TODO(bmahler): Make this required once it is always set.
+  const Option<std::string> version;
+
   process::Time registeredTime;
   Option<process::Time> reregisteredTime;
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/2c46ccca/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index 16d9d67..b8039ef 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -227,6 +227,11 @@ message FrameworkErrorMessage {
 
 message RegisterSlaveMessage {
   required SlaveInfo slave = 1;
+
+  // NOTE: This is a hack for the master to detect the slave's
+  // version. If unset the slave is < 0.21.0.
+  // TODO(bmahler): Do proper versioning: MESOS-986.
+  optional string version = 2;
 }
 
 
@@ -239,6 +244,11 @@ message ReregisterSlaveMessage {
   repeated ExecutorInfo executor_infos = 4;
   repeated Task tasks = 3;
   repeated Archive.Framework completed_frameworks = 5;
+
+  // NOTE: This is a hack for the master to detect the slave's
+  // version. If unset the slave is < 0.21.0.
+  // TODO(bmahler): Do proper versioning: MESOS-986.
+  optional string version = 6;
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/2c46ccca/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index ee3d649..809b008 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -882,11 +882,15 @@ void Slave::doReliableRegistration(const Duration& duration)
   if (!info.has_id()) {
     // Registering for the first time.
     RegisterSlaveMessage message;
+    message.set_version(MESOS_VERSION);
     message.mutable_slave()->CopyFrom(info);
+
     send(master.get(), message);
   } else {
     // Re-registering, so send tasks running.
     ReregisterSlaveMessage message;
+    message.set_version(MESOS_VERSION);
+
     // TODO(bmahler): Remove in 0.22.0.
     message.mutable_slave_id()->CopyFrom(info.id());
     message.mutable_slave()->CopyFrom(info);


[2/8] git commit: Added validation for missing ExecutorInfo::framework_id.

Posted by bm...@apache.org.
Added validation for missing ExecutorInfo::framework_id.

Review: https://reviews.apache.org/r/26200


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8f4dd637
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8f4dd637
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8f4dd637

Branch: refs/heads/master
Commit: 8f4dd6377f6a71405551bcc1822afa2e73815fcb
Parents: e1befdc
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Tue Sep 30 11:37:30 2014 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Oct 8 11:45:11 2014 -0700

----------------------------------------------------------------------
 src/master/master.cpp | 30 +++++++++++++++++++++---------
 1 file changed, 21 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/8f4dd637/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 79588da..a1716f4 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1985,6 +1985,18 @@ struct ExecutorInfoChecker : TaskInfoVisitor
     }
 
     if (task.has_executor()) {
+      // The master currently expects ExecutorInfo.framework_id
+      // to be set even though it is an optional field.
+      // Currently, the scheduler driver ensures that the field
+      // is set. For schedulers not using the driver, we need to
+      // do the validation here.
+      // TODO(bmahler): Set this field in the master instead of
+      // depending on the scheduler driver do it.
+      if (!task.executor().has_framework_id()) {
+        return stringify(
+            "Task has invalid ExecutorInfo: missing field 'framework_id'");
+      }
+
       const ExecutorID& executorId = task.executor().executor_id();
       Option<ExecutorInfo> executorInfo = None();
 
@@ -2008,15 +2020,15 @@ struct ExecutorInfoChecker : TaskInfoVisitor
       }
 
       if (executorInfo.isSome() && !(task.executor() == executorInfo.get())) {
-          return "Task has invalid ExecutorInfo (existing ExecutorInfo"
-              " with same ExecutorID is not compatible).\n"
-              "------------------------------------------------------------\n"
-              "Existing ExecutorInfo:\n" +
-              stringify(executorInfo.get()) + "\n"
-              "------------------------------------------------------------\n"
-              "Task's ExecutorInfo:\n" +
-              stringify(task.executor()) + "\n"
-              "------------------------------------------------------------\n";
+        return "Task has invalid ExecutorInfo (existing ExecutorInfo"
+               " with same ExecutorID is not compatible).\n"
+               "------------------------------------------------------------\n"
+               "Existing ExecutorInfo:\n" +
+               stringify(executorInfo.get()) + "\n"
+               "------------------------------------------------------------\n"
+               "Task's ExecutorInfo:\n" +
+               stringify(task.executor()) + "\n"
+               "------------------------------------------------------------\n";
       }
     }
 


[8/8] git commit: Cleaned up Master::addFramework.

Posted by bm...@apache.org.
Cleaned up Master::addFramework.

Review: https://reviews.apache.org/r/26205


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e4bd7948
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e4bd7948
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e4bd7948

Branch: refs/heads/master
Commit: e4bd7948976499d9eb30ff0d6741c5df64faf349
Parents: a4a0d15
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Mon Sep 29 14:15:21 2014 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Oct 8 11:45:12 2014 -0700

----------------------------------------------------------------------
 src/master/master.cpp | 19 ++++++++++++++-----
 1 file changed, 14 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/e4bd7948/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 26cd29a..0286353 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1433,6 +1433,11 @@ void Master::_registerFramework(
   }
 
   addFramework(framework);
+
+  FrameworkRegisteredMessage message;
+  message.mutable_framework_id()->MergeFrom(framework->id);
+  message.mutable_master_info()->MergeFrom(info_);
+  send(framework->pid, message);
 }
 
 
@@ -1630,6 +1635,15 @@ void Master::_reregisterFramework(
     // (above) so that we can properly determine the resources it's
     // currently using!
     addFramework(framework);
+
+    // TODO(bmahler): We have to send a registered message here for
+    // the re-registering framework, per the API contract. Send
+    // re-register here per MESOS-786; requires deprecation or it
+    // will break frameworks.
+    FrameworkRegisteredMessage message;
+    message.mutable_framework_id()->MergeFrom(framework->id);
+    message.mutable_master_info()->MergeFrom(info_);
+    send(framework->pid, message);
   }
 
   CHECK(frameworks.registered.contains(frameworkInfo.id()))
@@ -3985,11 +3999,6 @@ void Master::addFramework(Framework* framework)
 
   roles[framework->info.role()]->addFramework(framework);
 
-  FrameworkRegisteredMessage message;
-  message.mutable_framework_id()->MergeFrom(framework->id);
-  message.mutable_master_info()->MergeFrom(info_);
-  send(framework->pid, message);
-
   // There should be no offered resources yet!
   CHECK_EQ(Resources(), framework->offeredResources);
 


[6/8] git commit: Removed redundant logging in the Master.

Posted by bm...@apache.org.
Removed redundant logging in the Master.

Review: https://reviews.apache.org/r/26203


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ee4f879e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ee4f879e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ee4f879e

Branch: refs/heads/master
Commit: ee4f879e070e83d0bd2b2333e3055475c194b33e
Parents: 2c46ccc
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Fri Sep 26 16:28:04 2014 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Oct 8 11:45:12 2014 -0700

----------------------------------------------------------------------
 src/master/master.cpp | 9 ++++-----
 1 file changed, 4 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/ee4f879e/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 03881df..e445c86 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -2993,7 +2993,8 @@ void Master::_registerSlave(
         version.empty() ? Option<string>::none() : version,
         Clock::now());
 
-    LOG(INFO) << "Registered slave " << *slave;
+    LOG(INFO) << "Registered slave " << *slave
+              << " with " << slave->info.resources();
     ++metrics.slave_registrations;
 
     addSlave(slave);
@@ -3191,7 +3192,8 @@ void Master::_reregisterSlave(
 
     slave->reregisteredTime = Clock::now();
 
-    LOG(INFO) << "Re-registered slave " << *slave;
+    LOG(INFO) << "Re-registered slave " << *slave
+              << " with " << slave->info.resources();
     ++metrics.slave_reregistrations;
 
     readdSlave(slave, executorInfos, tasks, completedFrameworks);
@@ -4216,9 +4218,6 @@ void Master::addSlave(Slave* slave, bool reregister)
 {
   CHECK_NOTNULL(slave);
 
-  LOG(INFO) << "Adding slave " << *slave
-            << " with " << slave->info.resources();
-
   slaves.removed.erase(slave->id);
   slaves.registered[slave->id] = slave;
 


[7/8] git commit: Removed the need for Master::readdSlave.

Posted by bm...@apache.org.
Removed the need for Master::readdSlave.

Review: https://reviews.apache.org/r/26204


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a4a0d158
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a4a0d158
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a4a0d158

Branch: refs/heads/master
Commit: a4a0d1580815360083a46b43d458c5babd58c632
Parents: ee4f879
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Mon Sep 29 14:07:40 2014 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Oct 8 11:45:12 2014 -0700

----------------------------------------------------------------------
 src/master/master.cpp | 105 +++++++++++++++++++--------------------------
 src/master/master.hpp |  24 ++++++++---
 2 files changed, 60 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a4a0d158/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index e445c86..26cd29a 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -2993,11 +2993,16 @@ void Master::_registerSlave(
         version.empty() ? Option<string>::none() : version,
         Clock::now());
 
-    LOG(INFO) << "Registered slave " << *slave
-              << " with " << slave->info.resources();
     ++metrics.slave_registrations;
 
     addSlave(slave);
+
+    SlaveRegisteredMessage message;
+    message.mutable_slave_id()->MergeFrom(slave->id);
+    send(slave->pid, message);
+
+    LOG(INFO) << "Registered slave " << *slave
+              << " with " << slave->info.resources();
   }
 }
 
@@ -3188,15 +3193,22 @@ void Master::_reregisterSlave(
         slaveInfo,
         pid,
         version.empty() ? Option<string>::none() : version,
-        Clock::now());
+        Clock::now(),
+        executorInfos,
+        tasks);
 
     slave->reregisteredTime = Clock::now();
 
-    LOG(INFO) << "Re-registered slave " << *slave
-              << " with " << slave->info.resources();
     ++metrics.slave_reregistrations;
 
-    readdSlave(slave, executorInfos, tasks, completedFrameworks);
+    addSlave(slave, completedFrameworks);
+
+    SlaveReregisteredMessage message;
+    message.mutable_slave_id()->MergeFrom(slave->id);
+    send(slave->pid, message);
+
+    LOG(INFO) << "Re-registered slave " << *slave
+              << " with " << slave->info.resources();
 
     __reregisterSlave(slave, tasks);
   }
@@ -4214,7 +4226,9 @@ void Master::removeFramework(Slave* slave, Framework* framework)
 }
 
 
-void Master::addSlave(Slave* slave, bool reregister)
+void Master::addSlave(
+    Slave* slave,
+    const vector<Archive::Framework>& completedFrameworks)
 {
   CHECK_NOTNULL(slave);
 
@@ -4223,70 +4237,37 @@ void Master::addSlave(Slave* slave, bool reregister)
 
   link(slave->pid);
 
-  if (!reregister) {
-    SlaveRegisteredMessage message;
-    message.mutable_slave_id()->MergeFrom(slave->id);
-    send(slave->pid, message);
-  } else {
-    SlaveReregisteredMessage message;
-    message.mutable_slave_id()->MergeFrom(slave->id);
-    send(slave->pid, message);
-  }
-
   // Set up an observer for the slave.
   slave->observer = new SlaveObserver(
       slave->pid, slave->info, slave->id, self());
 
   spawn(slave->observer);
 
-  if (!reregister) {
-    allocator->slaveAdded(slave->id, slave->info, slave->usedResources);
-  }
-}
-
-
-void Master::readdSlave(
-    Slave* slave,
-    const vector<ExecutorInfo>& executorInfos,
-    const vector<Task>& tasks,
-    const vector<Archive::Framework>& completedFrameworks)
-{
-  CHECK_NOTNULL(slave);
-
-  addSlave(slave, true);
-
-  foreach (const ExecutorInfo& executorInfo, executorInfos) {
-    // TODO(bmahler): ExecutorInfo.framework_id is set by the Scheduler
-    // Driver in 0.14.0. Therefore, in 0.15.0, the slave no longer needs
-    // to set it, and we could remove this CHECK if desired.
-    CHECK(executorInfo.has_framework_id())
-      << "Executor " << executorInfo.executor_id()
-      << " doesn't have frameworkId set";
-
-    slave->addExecutor(executorInfo.framework_id(), executorInfo);
-
-    Framework* framework = getFramework(executorInfo.framework_id());
-    if (framework != NULL) { // The framework might not be re-registered yet.
-      framework->addExecutor(slave->id, executorInfo);
+  // Add the slave's executors to the frameworks.
+  foreachkey (const FrameworkID& frameworkId, slave->executors) {
+    foreachvalue (const ExecutorInfo& executorInfo,
+                  slave->executors[frameworkId]) {
+      Framework* framework = getFramework(frameworkId);
+      if (framework != NULL) { // The framework might not be re-registered yet.
+        framework->addExecutor(slave->id, executorInfo);
+      }
     }
   }
 
-  foreach (const Task& task, tasks) {
-    Task* t = new Task(task);
-
-    // Add the task to the slave.
-    slave->addTask(t);
-
-    Framework* framework = getFramework(task.framework_id());
-    if (framework != NULL) { // The framework might not be re-registered yet.
-      framework->addTask(t);
-    } else {
-      // TODO(benh): We should really put a timeout on how long we
-      // keep tasks running on a slave that never have frameworks
-      // reregister and claim them.
-      LOG(WARNING) << "Possibly orphaned task " << task.task_id()
-                   << " of framework " << task.framework_id()
-                   << " running on slave " << *slave;
+  // Add the slave's tasks to the frameworks.
+  foreachkey (const FrameworkID& frameworkId, slave->tasks) {
+    foreachvalue (Task* task, slave->tasks[frameworkId]) {
+      Framework* framework = getFramework(task->framework_id());
+      if (framework != NULL) { // The framework might not be re-registered yet.
+        framework->addTask(task);
+      } else {
+        // TODO(benh): We should really put a timeout on how long we
+        // keep tasks running on a slave that never have frameworks
+        // reregister and claim them.
+        LOG(WARNING) << "Possibly orphaned task " << task->task_id()
+                     << " of framework " << task->framework_id()
+                     << " running on slave " << *slave;
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/a4a0d158/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index e97d213..37ce31a 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -335,13 +335,10 @@ protected:
   void deactivate(Slave* slave);
 
   // Add a slave.
-  void addSlave(Slave* slave, bool reregister = false);
-
-  void readdSlave(
+  void addSlave(
       Slave* slave,
-      const std::vector<ExecutorInfo>& executorInfos,
-      const std::vector<Task>& tasks,
-      const std::vector<Archive::Framework>& completedFrameworks);
+      const std::vector<Archive::Framework>& completedFrameworks =
+        std::vector<Archive::Framework>());
 
   // Remove the slave from the registrar and from the master's state.
   void removeSlave(Slave* slave);
@@ -820,7 +817,11 @@ struct Slave
   Slave(const SlaveInfo& _info,
         const process::UPID& _pid,
         const Option<std::string> _version,
-        const process::Time& _registeredTime)
+        const process::Time& _registeredTime,
+        const std::vector<ExecutorInfo> executorInfos =
+          std::vector<ExecutorInfo>(),
+        const std::vector<Task> tasks =
+          std::vector<Task>())
     : id(_info.id()),
       info(_info),
       pid(_pid),
@@ -831,6 +832,15 @@ struct Slave
       observer(NULL)
   {
     CHECK(_info.has_id());
+
+    foreach (const ExecutorInfo& executorInfo, executorInfos) {
+      CHECK(executorInfo.has_framework_id());
+      addExecutor(executorInfo.framework_id(), executorInfo);
+    }
+
+    foreach (const Task& task, tasks) {
+      addTask(new Task(task));
+    }
   }
 
   ~Slave() {}


[3/8] git commit: Eliminated redundant resource accounting in the master.

Posted by bm...@apache.org.
Eliminated redundant resource accounting in the master.

Review: https://reviews.apache.org/r/26199


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e1befdce
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e1befdce
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e1befdce

Branch: refs/heads/master
Commit: e1befdcee8edbab4ccb33d139f714b7e0e954dd8
Parents: c866484
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Fri Sep 26 16:19:09 2014 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Oct 8 11:45:11 2014 -0700

----------------------------------------------------------------------
 src/master/http.cpp   |  8 ++++---
 src/master/master.cpp | 24 ++++++-------------
 src/master/master.hpp | 60 +++++++++++++++++++++++++++++-----------------
 3 files changed, 50 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/e1befdce/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 3fd4b45..a5e34cc 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -415,9 +415,11 @@ Future<Response> Master::Http::stats(const Request& request)
         totalResources += resource;
       }
     }
-    foreach (const Resource& resource, slave->usedResources) {
-      if (resource.type() == Value::SCALAR) {
-        usedResources += resource;
+    foreachvalue (const Resources& resources, slave->usedResources) {
+      foreach (const Resource& resource, resources) {
+        if (resource.type() == Value::SCALAR) {
+          usedResources += resource;
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/mesos/blob/e1befdce/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index e2c58d1..79588da 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -4212,8 +4212,7 @@ void Master::addSlave(Slave* slave, bool reregister)
   spawn(slave->observer);
 
   if (!reregister) {
-    allocator->slaveAdded(
-        slave->id, slave->info, hashmap<FrameworkID, Resources>());
+    allocator->slaveAdded(slave->id, slave->info, slave->usedResources);
   }
 }
 
@@ -4228,10 +4227,6 @@ void Master::readdSlave(
 
   addSlave(slave, true);
 
-  // Add the executors and tasks to the slave and framework state and
-  // determine the resources that have been allocated to frameworks.
-  hashmap<FrameworkID, Resources> resources;
-
   foreach (const ExecutorInfo& executorInfo, executorInfos) {
     // TODO(bmahler): ExecutorInfo.framework_id is set by the Scheduler
     // Driver in 0.14.0. Therefore, in 0.15.0, the slave no longer needs
@@ -4246,8 +4241,6 @@ void Master::readdSlave(
     if (framework != NULL) { // The framework might not be re-registered yet.
       framework->addExecutor(slave->id, executorInfo);
     }
-
-    resources[executorInfo.framework_id()] += executorInfo.resources();
   }
 
   foreach (const Task& task, tasks) {
@@ -4267,11 +4260,6 @@ void Master::readdSlave(
                    << " of framework " << task.framework_id()
                    << " running on slave " << *slave;
     }
-
-    // Terminal tasks do not consume resoures.
-    if (!protobuf::isTerminalState(task.state())) {
-      resources[task.framework_id()] += task.resources();
-    }
   }
 
   // Re-add completed tasks reported by the slave.
@@ -4300,7 +4288,7 @@ void Master::readdSlave(
     }
   }
 
-  allocator->slaveAdded(slave->id, slave->info, resources);
+  allocator->slaveAdded(slave->id, slave->info, slave->usedResources);
 }
 
 
@@ -5146,9 +5134,11 @@ double Master::_resources_used(const std::string& name)
   double used = 0.0;
 
   foreachvalue (Slave* slave, slaves.registered) {
-    foreach (const Resource& resource, slave->usedResources) {
-      if (resource.name() == name && resource.type() == Value::SCALAR) {
-        used += resource.scalar().value();
+    foreachvalue (const Resources& resources, slave->usedResources) {
+      foreach (const Resource& resource, resources) {
+        if (resource.name() == name && resource.type() == Value::SCALAR) {
+          used += resource.scalar().value();
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/mesos/blob/e1befdce/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 5c0f224..5cafae3 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -838,17 +838,19 @@ struct Slave
 
   void addTask(Task* task)
   {
-    CHECK(!tasks[task->framework_id()].contains(task->task_id()))
-      << "Duplicate task " << task->task_id()
-      << " of framework " << task->framework_id();
+    const TaskID& taskId = task->task_id();
+    const FrameworkID& frameworkId = task->framework_id();
+
+    CHECK(!tasks[frameworkId].contains(taskId))
+      << "Duplicate task " << taskId << " of framework " << frameworkId;
 
-    tasks[task->framework_id()][task->task_id()] = task;
+    tasks[frameworkId][taskId] = task;
 
     if (!protobuf::isTerminalState(task->state())) {
-      usedResources += task->resources();
+      usedResources[frameworkId] += task->resources();
     }
 
-    LOG(INFO) << "Adding task " << task->task_id()
+    LOG(INFO) << "Adding task " << taskId
               << " with resources " << task->resources()
               << " on slave " << id << " (" << info.hostname() << ")";
   }
@@ -859,30 +861,40 @@ struct Slave
   // functionally for all tasks is expensive, for now.
   void taskTerminated(Task* task)
   {
+    const TaskID& taskId = task->task_id();
+    const FrameworkID& frameworkId = task->framework_id();
+
     CHECK(protobuf::isTerminalState(task->state()));
-    CHECK(tasks[task->framework_id()].contains(task->task_id()))
-      << "Unknown task " << task->task_id()
-      << " of framework " << task->framework_id();
+    CHECK(tasks[frameworkId].contains(taskId))
+      << "Unknown task " << taskId << " of framework " << frameworkId;
 
-    usedResources -= task->resources();
+    usedResources[frameworkId] -= task->resources();
+    if (!tasks.contains(frameworkId) && !executors.contains(frameworkId)) {
+      usedResources.erase(frameworkId);
+    }
   }
 
   void removeTask(Task* task)
   {
-    CHECK(tasks[task->framework_id()].contains(task->task_id()))
-      << "Unknown task " << task->task_id()
-      << " of framework " << task->framework_id();
+    const TaskID& taskId = task->task_id();
+    const FrameworkID& frameworkId = task->framework_id();
+
+    CHECK(tasks[frameworkId].contains(taskId))
+      << "Unknown task " << taskId << " of framework " << frameworkId;
 
     if (!protobuf::isTerminalState(task->state())) {
-      usedResources -= task->resources();
+      usedResources[frameworkId] -= task->resources();
+      if (!tasks.contains(frameworkId) && !executors.contains(frameworkId)) {
+        usedResources.erase(frameworkId);
+      }
     }
 
-    tasks[task->framework_id()].erase(task->task_id());
-    if (tasks[task->framework_id()].empty()) {
-      tasks.erase(task->framework_id());
+    tasks[frameworkId].erase(taskId);
+    if (tasks[frameworkId].empty()) {
+      tasks.erase(frameworkId);
     }
 
-    killedTasks.remove(task->framework_id(), task->task_id());
+    killedTasks.remove(frameworkId, taskId);
   }
 
   void addOffer(Offer* offer)
@@ -916,7 +928,7 @@ struct Slave
       << " of framework " << frameworkId;
 
     executors[frameworkId][executorInfo.executor_id()] = executorInfo;
-    usedResources += executorInfo.resources();
+    usedResources[frameworkId] += executorInfo.resources();
   }
 
   void removeExecutor(const FrameworkID& frameworkId,
@@ -925,7 +937,11 @@ struct Slave
     CHECK(hasExecutor(frameworkId, executorId))
       << "Unknown executor " << executorId << " of framework " << frameworkId;
 
-    usedResources -= executors[frameworkId][executorId].resources();
+    usedResources[frameworkId] -=
+      executors[frameworkId][executorId].resources();
+
+    // XXX Remove.
+
     executors[frameworkId].erase(executorId);
     if (executors[frameworkId].empty()) {
       executors.erase(frameworkId);
@@ -964,8 +980,8 @@ struct Slave
   // Active offers on this slave.
   hashset<Offer*> offers;
 
-  Resources usedResources;    // Active task / executor resources.
-  Resources offeredResources; // Offered resources.
+  hashmap<FrameworkID, Resources> usedResources;  // Active task / executors.
+  Resources offeredResources; // Offers.
 
   SlaveObserver* observer;
 


[4/8] git commit: Properly deprecated ReregisterSlaveMessage::slave_id.

Posted by bm...@apache.org.
Properly deprecated ReregisterSlaveMessage::slave_id.

Review: https://reviews.apache.org/r/26201


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/5da57a7e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/5da57a7e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/5da57a7e

Branch: refs/heads/master
Commit: 5da57a7ef769d12bfbdb64f365b409853fdd9935
Parents: 8f4dd63
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Thu Sep 25 16:59:19 2014 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Oct 8 11:45:11 2014 -0700

----------------------------------------------------------------------
 src/master/master.cpp       | 9 +++------
 src/master/master.hpp       | 9 +++++----
 src/messages/messages.proto | 7 ++++---
 src/slave/slave.cpp         | 4 ++--
 4 files changed, 14 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/5da57a7e/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index a1716f4..eb7b210 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -583,7 +583,6 @@ void Master::initialize()
 
   install<ReregisterSlaveMessage>(
       &Master::reregisterSlave,
-      &ReregisterSlaveMessage::slave_id,
       &ReregisterSlaveMessage::slave,
       &ReregisterSlaveMessage::executor_infos,
       &ReregisterSlaveMessage::tasks,
@@ -2981,7 +2980,7 @@ void Master::_registerSlave(
         stringify(slaveInfo.id()));
     send(pid, message);
   } else {
-    Slave* slave = new Slave(slaveInfo, slaveInfo.id(), pid, Clock::now());
+    Slave* slave = new Slave(slaveInfo, pid, Clock::now());
 
     LOG(INFO) << "Registered slave " << *slave;
     ++metrics.slave_registrations;
@@ -2993,7 +2992,6 @@ void Master::_registerSlave(
 
 void Master::reregisterSlave(
     const UPID& from,
-    const SlaveID& slaveId,
     const SlaveInfo& slaveInfo,
     const vector<ExecutorInfo>& executorInfos,
     const vector<Task>& tasks,
@@ -3009,7 +3007,6 @@ void Master::reregisterSlave(
       .onReady(defer(self(),
                      &Self::reregisterSlave,
                      from,
-                     slaveId,
                      slaveInfo,
                      executorInfos,
                      tasks,
@@ -3036,7 +3033,7 @@ void Master::reregisterSlave(
     // re-registering. This is because a non-strict registrar cannot
     // enforce this. We've already told frameworks the tasks were
     // lost so it's important to deny the slave from re-registering.
-    LOG(WARNING) << "Slave " << slaveId << " at " << from
+    LOG(WARNING) << "Slave " << slaveInfo.id() << " at " << from
                  << " (" << slaveInfo.hostname() << ") attempted to "
                  << "re-register after removal; shutting it down";
 
@@ -3171,7 +3168,7 @@ void Master::_reregisterSlave(
     send(pid, message);
   } else {
     // Re-admission succeeded.
-    Slave* slave = new Slave(slaveInfo, slaveInfo.id(), pid, Clock::now());
+    Slave* slave = new Slave(slaveInfo, pid, Clock::now());
     slave->reregisteredTime = Clock::now();
 
     LOG(INFO) << "Re-registered slave " << *slave;

http://git-wip-us.apache.org/repos/asf/mesos/blob/5da57a7e/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 5cafae3..0f0b205 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -160,7 +160,6 @@ public:
       const SlaveInfo& slaveInfo);
   void reregisterSlave(
       const process::UPID& from,
-      const SlaveID& slaveId,
       const SlaveInfo& slaveInfo,
       const std::vector<ExecutorInfo>& executorInfos,
       const std::vector<Task>& tasks,
@@ -815,16 +814,18 @@ private:
 struct Slave
 {
   Slave(const SlaveInfo& _info,
-        const SlaveID& _id,
         const process::UPID& _pid,
         const process::Time& time)
-    : id(_id),
+    : id(_info.id()),
       info(_info),
       pid(_pid),
       registeredTime(time),
       connected(true),
       active(true),
-      observer(NULL) {}
+      observer(NULL)
+  {
+    CHECK(_info.has_id());
+  }
 
   ~Slave() {}
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/5da57a7e/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index 9ff06b3..16d9d67 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -231,9 +231,10 @@ message RegisterSlaveMessage {
 
 
 message ReregisterSlaveMessage {
-  // TODO(bmahler): Deprecate and remove the explicit slave_id as
-  // SlaveInfo already includes this information.
-  required SlaveID slave_id = 1;
+  // TODO(bmahler): slave_id is deprecated.
+  // 0.21.0: Now an optional field. Always written, never read.
+  // 0.22.0: Remove this field.
+  optional SlaveID slave_id = 1;
   required SlaveInfo slave = 2;
   repeated ExecutorInfo executor_infos = 4;
   repeated Task tasks = 3;

http://git-wip-us.apache.org/repos/asf/mesos/blob/5da57a7e/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 853c350..ee3d649 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -879,7 +879,7 @@ void Slave::doReliableRegistration(const Duration& duration)
 
   CHECK(state == DISCONNECTED) << state;
 
-  if (info.id() == "") {
+  if (!info.has_id()) {
     // Registering for the first time.
     RegisterSlaveMessage message;
     message.mutable_slave()->CopyFrom(info);
@@ -887,7 +887,7 @@ void Slave::doReliableRegistration(const Duration& duration)
   } else {
     // Re-registering, so send tasks running.
     ReregisterSlaveMessage message;
-    // TODO(bmahler): deprecate this.
+    // TODO(bmahler): Remove in 0.22.0.
     message.mutable_slave_id()->CopyFrom(info.id());
     message.mutable_slave()->CopyFrom(info);
 


[5/8] git commit: Fixed a log line in the master.

Posted by bm...@apache.org.
Fixed a log line in the master.

Review: https://reviews.apache.org/r/26198


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c8664847
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c8664847
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c8664847

Branch: refs/heads/master
Commit: c86648479476c556b9fb2f5a710af258991ba6e7
Parents: 7c7f8dd
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Mon Sep 29 12:23:12 2014 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Oct 8 11:45:11 2014 -0700

----------------------------------------------------------------------
 src/master/master.cpp | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/c8664847/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 86dc544..e2c58d1 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -3929,7 +3929,7 @@ void Master::reconcile(
 void Master::addFramework(Framework* framework)
 {
   CHECK(!frameworks.registered.contains(framework->id))
-    << "Framework " << framework->id << "already exists!";
+    << "Framework " << framework->id << " already exists!";
 
   frameworks.registered[framework->id] = framework;