You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2014/10/08 20:45:48 UTC
[1/8] git commit: Introduced a version during slave (re-)registration.
Repository: mesos
Updated Branches:
refs/heads/master 7c7f8dd19 -> e4bd79489
Introduced a version during slave (re-)registration.
Review: https://reviews.apache.org/r/26202
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2c46ccca
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2c46ccca
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2c46ccca
Branch: refs/heads/master
Commit: 2c46ccca68841d2f6b475812817225f21630a5e5
Parents: 5da57a7
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Thu Sep 25 18:11:12 2014 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Oct 8 11:45:11 2014 -0700
----------------------------------------------------------------------
src/master/master.cpp | 50 ++++++++++++++++++++++++++++------------
src/master/master.hpp | 19 +++++++++++----
src/messages/messages.proto | 10 ++++++++
src/slave/slave.cpp | 4 ++++
4 files changed, 64 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/2c46ccca/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index eb7b210..03881df 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -579,14 +579,16 @@ void Master::initialize()
install<RegisterSlaveMessage>(
&Master::registerSlave,
- &RegisterSlaveMessage::slave);
+ &RegisterSlaveMessage::slave,
+ &RegisterSlaveMessage::version);
install<ReregisterSlaveMessage>(
&Master::reregisterSlave,
&ReregisterSlaveMessage::slave,
&ReregisterSlaveMessage::executor_infos,
&ReregisterSlaveMessage::tasks,
- &ReregisterSlaveMessage::completed_frameworks);
+ &ReregisterSlaveMessage::completed_frameworks,
+ &ReregisterSlaveMessage::version);
install<UnregisterSlaveMessage>(
&Master::unregisterSlave,
@@ -2876,7 +2878,10 @@ void Master::schedulerMessage(
}
-void Master::registerSlave(const UPID& from, const SlaveInfo& slaveInfo)
+void Master::registerSlave(
+ const UPID& from,
+ const SlaveInfo& slaveInfo,
+ const string& version)
{
++metrics.messages_register_slave;
@@ -2885,7 +2890,7 @@ void Master::registerSlave(const UPID& from, const SlaveInfo& slaveInfo)
<< " because authentication is still in progress";
authenticating[from]
- .onReady(defer(self(), &Self::registerSlave, from, slaveInfo));
+ .onReady(defer(self(), &Self::registerSlave, from, slaveInfo, version));
return;
}
@@ -2950,6 +2955,7 @@ void Master::registerSlave(const UPID& from, const SlaveInfo& slaveInfo)
&Self::_registerSlave,
slaveInfo_,
from,
+ version,
lambda::_1));
}
@@ -2957,6 +2963,7 @@ void Master::registerSlave(const UPID& from, const SlaveInfo& slaveInfo)
void Master::_registerSlave(
const SlaveInfo& slaveInfo,
const UPID& pid,
+ const string& version,
const Future<bool>& admit)
{
slaves.registering.erase(pid);
@@ -2980,7 +2987,11 @@ void Master::_registerSlave(
stringify(slaveInfo.id()));
send(pid, message);
} else {
- Slave* slave = new Slave(slaveInfo, pid, Clock::now());
+ Slave* slave = new Slave(
+ slaveInfo,
+ pid,
+ version.empty() ? Option<string>::none() : version,
+ Clock::now());
LOG(INFO) << "Registered slave " << *slave;
++metrics.slave_registrations;
@@ -2995,7 +3006,8 @@ void Master::reregisterSlave(
const SlaveInfo& slaveInfo,
const vector<ExecutorInfo>& executorInfos,
const vector<Task>& tasks,
- const vector<Archive::Framework>& completedFrameworks)
+ const vector<Archive::Framework>& completedFrameworks,
+ const string& version)
{
++metrics.messages_reregister_slave;
@@ -3010,7 +3022,8 @@ void Master::reregisterSlave(
slaveInfo,
executorInfos,
tasks,
- completedFrameworks));
+ completedFrameworks,
+ version));
return;
}
@@ -3130,13 +3143,14 @@ void Master::reregisterSlave(
// registrar.
registrar->apply(Owned<Operation>(new ReadmitSlave(slaveInfo)))
.onAny(defer(self(),
- &Self::_reregisterSlave,
- slaveInfo,
- from,
- executorInfos,
- tasks,
- completedFrameworks,
- lambda::_1));
+ &Self::_reregisterSlave,
+ slaveInfo,
+ from,
+ executorInfos,
+ tasks,
+ completedFrameworks,
+ version,
+ lambda::_1));
}
@@ -3146,6 +3160,7 @@ void Master::_reregisterSlave(
const vector<ExecutorInfo>& executorInfos,
const vector<Task>& tasks,
const vector<Archive::Framework>& completedFrameworks,
+ const string& version,
const Future<bool>& readmit)
{
slaves.reregistering.erase(slaveInfo.id());
@@ -3168,7 +3183,12 @@ void Master::_reregisterSlave(
send(pid, message);
} else {
// Re-admission succeeded.
- Slave* slave = new Slave(slaveInfo, pid, Clock::now());
+ Slave* slave = new Slave(
+ slaveInfo,
+ pid,
+ version.empty() ? Option<string>::none() : version,
+ Clock::now());
+
slave->reregisteredTime = Clock::now();
LOG(INFO) << "Re-registered slave " << *slave;
http://git-wip-us.apache.org/repos/asf/mesos/blob/2c46ccca/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 0f0b205..e97d213 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -157,13 +157,15 @@ public:
const std::string& data);
void registerSlave(
const process::UPID& from,
- const SlaveInfo& slaveInfo);
+ const SlaveInfo& slaveInfo,
+ const std::string& version);
void reregisterSlave(
const process::UPID& from,
const SlaveInfo& slaveInfo,
const std::vector<ExecutorInfo>& executorInfos,
const std::vector<Task>& tasks,
- const std::vector<Archive::Framework>& completedFrameworks);
+ const std::vector<Archive::Framework>& completedFrameworks,
+ const std::string& version);
void unregisterSlave(
const process::UPID& from,
@@ -227,6 +229,7 @@ public:
const std::vector<ExecutorInfo>& executorInfos,
const std::vector<Task>& tasks,
const std::vector<Archive::Framework>& completedFrameworks,
+ const std::string& version,
const process::Future<bool>& readmit);
MasterInfo info() const
@@ -267,6 +270,7 @@ protected:
void _registerSlave(
const SlaveInfo& slaveInfo,
const process::UPID& pid,
+ const std::string& version,
const process::Future<bool>& admit);
void __reregisterSlave(
@@ -815,11 +819,13 @@ struct Slave
{
Slave(const SlaveInfo& _info,
const process::UPID& _pid,
- const process::Time& time)
+ const Option<std::string> _version,
+ const process::Time& _registeredTime)
: id(_info.id()),
info(_info),
pid(_pid),
- registeredTime(time),
+ version(_version),
+ registeredTime(_registeredTime),
connected(true),
active(true),
observer(NULL)
@@ -954,6 +960,11 @@ struct Slave
process::UPID pid;
+ // The Mesos version of the slave. If set, the slave is >= 0.21.0.
+ // TODO(bmahler): Use stout's Version when it can parse labels, etc.
+ // TODO(bmahler): Make this required once it is always set.
+ const Option<std::string> version;
+
process::Time registeredTime;
Option<process::Time> reregisteredTime;
http://git-wip-us.apache.org/repos/asf/mesos/blob/2c46ccca/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index 16d9d67..b8039ef 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -227,6 +227,11 @@ message FrameworkErrorMessage {
message RegisterSlaveMessage {
required SlaveInfo slave = 1;
+
+ // NOTE: This is a hack for the master to detect the slave's
+ // version. If unset the slave is < 0.21.0.
+ // TODO(bmahler): Do proper versioning: MESOS-986.
+ optional string version = 2;
}
@@ -239,6 +244,11 @@ message ReregisterSlaveMessage {
repeated ExecutorInfo executor_infos = 4;
repeated Task tasks = 3;
repeated Archive.Framework completed_frameworks = 5;
+
+ // NOTE: This is a hack for the master to detect the slave's
+ // version. If unset the slave is < 0.21.0.
+ // TODO(bmahler): Do proper versioning: MESOS-986.
+ optional string version = 6;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/2c46ccca/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index ee3d649..809b008 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -882,11 +882,15 @@ void Slave::doReliableRegistration(const Duration& duration)
if (!info.has_id()) {
// Registering for the first time.
RegisterSlaveMessage message;
+ message.set_version(MESOS_VERSION);
message.mutable_slave()->CopyFrom(info);
+
send(master.get(), message);
} else {
// Re-registering, so send tasks running.
ReregisterSlaveMessage message;
+ message.set_version(MESOS_VERSION);
+
// TODO(bmahler): Remove in 0.22.0.
message.mutable_slave_id()->CopyFrom(info.id());
message.mutable_slave()->CopyFrom(info);
[2/8] git commit: Added validation for missing
ExecutorInfo::framework_id.
Posted by bm...@apache.org.
Added validation for missing ExecutorInfo::framework_id.
Review: https://reviews.apache.org/r/26200
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8f4dd637
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8f4dd637
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8f4dd637
Branch: refs/heads/master
Commit: 8f4dd6377f6a71405551bcc1822afa2e73815fcb
Parents: e1befdc
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Tue Sep 30 11:37:30 2014 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Oct 8 11:45:11 2014 -0700
----------------------------------------------------------------------
src/master/master.cpp | 30 +++++++++++++++++++++---------
1 file changed, 21 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/8f4dd637/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 79588da..a1716f4 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1985,6 +1985,18 @@ struct ExecutorInfoChecker : TaskInfoVisitor
}
if (task.has_executor()) {
+ // The master currently expects ExecutorInfo.framework_id
+ // to be set even though it is an optional field.
+ // Currently, the scheduler driver ensures that the field
+ // is set. For schedulers not using the driver, we need to
+ // do the validation here.
+ // TODO(bmahler): Set this field in the master instead of
+ // depending on the scheduler driver do it.
+ if (!task.executor().has_framework_id()) {
+ return stringify(
+ "Task has invalid ExecutorInfo: missing field 'framework_id'");
+ }
+
const ExecutorID& executorId = task.executor().executor_id();
Option<ExecutorInfo> executorInfo = None();
@@ -2008,15 +2020,15 @@ struct ExecutorInfoChecker : TaskInfoVisitor
}
if (executorInfo.isSome() && !(task.executor() == executorInfo.get())) {
- return "Task has invalid ExecutorInfo (existing ExecutorInfo"
- " with same ExecutorID is not compatible).\n"
- "------------------------------------------------------------\n"
- "Existing ExecutorInfo:\n" +
- stringify(executorInfo.get()) + "\n"
- "------------------------------------------------------------\n"
- "Task's ExecutorInfo:\n" +
- stringify(task.executor()) + "\n"
- "------------------------------------------------------------\n";
+ return "Task has invalid ExecutorInfo (existing ExecutorInfo"
+ " with same ExecutorID is not compatible).\n"
+ "------------------------------------------------------------\n"
+ "Existing ExecutorInfo:\n" +
+ stringify(executorInfo.get()) + "\n"
+ "------------------------------------------------------------\n"
+ "Task's ExecutorInfo:\n" +
+ stringify(task.executor()) + "\n"
+ "------------------------------------------------------------\n";
}
}
[8/8] git commit: Cleaned up Master::addFramework.
Posted by bm...@apache.org.
Cleaned up Master::addFramework.
Review: https://reviews.apache.org/r/26205
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e4bd7948
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e4bd7948
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e4bd7948
Branch: refs/heads/master
Commit: e4bd7948976499d9eb30ff0d6741c5df64faf349
Parents: a4a0d15
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Mon Sep 29 14:15:21 2014 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Oct 8 11:45:12 2014 -0700
----------------------------------------------------------------------
src/master/master.cpp | 19 ++++++++++++++-----
1 file changed, 14 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/e4bd7948/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 26cd29a..0286353 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1433,6 +1433,11 @@ void Master::_registerFramework(
}
addFramework(framework);
+
+ FrameworkRegisteredMessage message;
+ message.mutable_framework_id()->MergeFrom(framework->id);
+ message.mutable_master_info()->MergeFrom(info_);
+ send(framework->pid, message);
}
@@ -1630,6 +1635,15 @@ void Master::_reregisterFramework(
// (above) so that we can properly determine the resources it's
// currently using!
addFramework(framework);
+
+ // TODO(bmahler): We have to send a registered message here for
+ // the re-registering framework, per the API contract. Send
+ // re-register here per MESOS-786; requires deprecation or it
+ // will break frameworks.
+ FrameworkRegisteredMessage message;
+ message.mutable_framework_id()->MergeFrom(framework->id);
+ message.mutable_master_info()->MergeFrom(info_);
+ send(framework->pid, message);
}
CHECK(frameworks.registered.contains(frameworkInfo.id()))
@@ -3985,11 +3999,6 @@ void Master::addFramework(Framework* framework)
roles[framework->info.role()]->addFramework(framework);
- FrameworkRegisteredMessage message;
- message.mutable_framework_id()->MergeFrom(framework->id);
- message.mutable_master_info()->MergeFrom(info_);
- send(framework->pid, message);
-
// There should be no offered resources yet!
CHECK_EQ(Resources(), framework->offeredResources);
[6/8] git commit: Removed redundant logging in the Master.
Posted by bm...@apache.org.
Removed redundant logging in the Master.
Review: https://reviews.apache.org/r/26203
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ee4f879e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ee4f879e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ee4f879e
Branch: refs/heads/master
Commit: ee4f879e070e83d0bd2b2333e3055475c194b33e
Parents: 2c46ccc
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Fri Sep 26 16:28:04 2014 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Oct 8 11:45:12 2014 -0700
----------------------------------------------------------------------
src/master/master.cpp | 9 ++++-----
1 file changed, 4 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/ee4f879e/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 03881df..e445c86 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -2993,7 +2993,8 @@ void Master::_registerSlave(
version.empty() ? Option<string>::none() : version,
Clock::now());
- LOG(INFO) << "Registered slave " << *slave;
+ LOG(INFO) << "Registered slave " << *slave
+ << " with " << slave->info.resources();
++metrics.slave_registrations;
addSlave(slave);
@@ -3191,7 +3192,8 @@ void Master::_reregisterSlave(
slave->reregisteredTime = Clock::now();
- LOG(INFO) << "Re-registered slave " << *slave;
+ LOG(INFO) << "Re-registered slave " << *slave
+ << " with " << slave->info.resources();
++metrics.slave_reregistrations;
readdSlave(slave, executorInfos, tasks, completedFrameworks);
@@ -4216,9 +4218,6 @@ void Master::addSlave(Slave* slave, bool reregister)
{
CHECK_NOTNULL(slave);
- LOG(INFO) << "Adding slave " << *slave
- << " with " << slave->info.resources();
-
slaves.removed.erase(slave->id);
slaves.registered[slave->id] = slave;
[7/8] git commit: Removed the need for Master::readdSlave.
Posted by bm...@apache.org.
Removed the need for Master::readdSlave.
Review: https://reviews.apache.org/r/26204
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a4a0d158
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a4a0d158
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a4a0d158
Branch: refs/heads/master
Commit: a4a0d1580815360083a46b43d458c5babd58c632
Parents: ee4f879
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Mon Sep 29 14:07:40 2014 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Oct 8 11:45:12 2014 -0700
----------------------------------------------------------------------
src/master/master.cpp | 105 +++++++++++++++++++--------------------------
src/master/master.hpp | 24 ++++++++---
2 files changed, 60 insertions(+), 69 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/a4a0d158/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index e445c86..26cd29a 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -2993,11 +2993,16 @@ void Master::_registerSlave(
version.empty() ? Option<string>::none() : version,
Clock::now());
- LOG(INFO) << "Registered slave " << *slave
- << " with " << slave->info.resources();
++metrics.slave_registrations;
addSlave(slave);
+
+ SlaveRegisteredMessage message;
+ message.mutable_slave_id()->MergeFrom(slave->id);
+ send(slave->pid, message);
+
+ LOG(INFO) << "Registered slave " << *slave
+ << " with " << slave->info.resources();
}
}
@@ -3188,15 +3193,22 @@ void Master::_reregisterSlave(
slaveInfo,
pid,
version.empty() ? Option<string>::none() : version,
- Clock::now());
+ Clock::now(),
+ executorInfos,
+ tasks);
slave->reregisteredTime = Clock::now();
- LOG(INFO) << "Re-registered slave " << *slave
- << " with " << slave->info.resources();
++metrics.slave_reregistrations;
- readdSlave(slave, executorInfos, tasks, completedFrameworks);
+ addSlave(slave, completedFrameworks);
+
+ SlaveReregisteredMessage message;
+ message.mutable_slave_id()->MergeFrom(slave->id);
+ send(slave->pid, message);
+
+ LOG(INFO) << "Re-registered slave " << *slave
+ << " with " << slave->info.resources();
__reregisterSlave(slave, tasks);
}
@@ -4214,7 +4226,9 @@ void Master::removeFramework(Slave* slave, Framework* framework)
}
-void Master::addSlave(Slave* slave, bool reregister)
+void Master::addSlave(
+ Slave* slave,
+ const vector<Archive::Framework>& completedFrameworks)
{
CHECK_NOTNULL(slave);
@@ -4223,70 +4237,37 @@ void Master::addSlave(Slave* slave, bool reregister)
link(slave->pid);
- if (!reregister) {
- SlaveRegisteredMessage message;
- message.mutable_slave_id()->MergeFrom(slave->id);
- send(slave->pid, message);
- } else {
- SlaveReregisteredMessage message;
- message.mutable_slave_id()->MergeFrom(slave->id);
- send(slave->pid, message);
- }
-
// Set up an observer for the slave.
slave->observer = new SlaveObserver(
slave->pid, slave->info, slave->id, self());
spawn(slave->observer);
- if (!reregister) {
- allocator->slaveAdded(slave->id, slave->info, slave->usedResources);
- }
-}
-
-
-void Master::readdSlave(
- Slave* slave,
- const vector<ExecutorInfo>& executorInfos,
- const vector<Task>& tasks,
- const vector<Archive::Framework>& completedFrameworks)
-{
- CHECK_NOTNULL(slave);
-
- addSlave(slave, true);
-
- foreach (const ExecutorInfo& executorInfo, executorInfos) {
- // TODO(bmahler): ExecutorInfo.framework_id is set by the Scheduler
- // Driver in 0.14.0. Therefore, in 0.15.0, the slave no longer needs
- // to set it, and we could remove this CHECK if desired.
- CHECK(executorInfo.has_framework_id())
- << "Executor " << executorInfo.executor_id()
- << " doesn't have frameworkId set";
-
- slave->addExecutor(executorInfo.framework_id(), executorInfo);
-
- Framework* framework = getFramework(executorInfo.framework_id());
- if (framework != NULL) { // The framework might not be re-registered yet.
- framework->addExecutor(slave->id, executorInfo);
+ // Add the slave's executors to the frameworks.
+ foreachkey (const FrameworkID& frameworkId, slave->executors) {
+ foreachvalue (const ExecutorInfo& executorInfo,
+ slave->executors[frameworkId]) {
+ Framework* framework = getFramework(frameworkId);
+ if (framework != NULL) { // The framework might not be re-registered yet.
+ framework->addExecutor(slave->id, executorInfo);
+ }
}
}
- foreach (const Task& task, tasks) {
- Task* t = new Task(task);
-
- // Add the task to the slave.
- slave->addTask(t);
-
- Framework* framework = getFramework(task.framework_id());
- if (framework != NULL) { // The framework might not be re-registered yet.
- framework->addTask(t);
- } else {
- // TODO(benh): We should really put a timeout on how long we
- // keep tasks running on a slave that never have frameworks
- // reregister and claim them.
- LOG(WARNING) << "Possibly orphaned task " << task.task_id()
- << " of framework " << task.framework_id()
- << " running on slave " << *slave;
+ // Add the slave's tasks to the frameworks.
+ foreachkey (const FrameworkID& frameworkId, slave->tasks) {
+ foreachvalue (Task* task, slave->tasks[frameworkId]) {
+ Framework* framework = getFramework(task->framework_id());
+ if (framework != NULL) { // The framework might not be re-registered yet.
+ framework->addTask(task);
+ } else {
+ // TODO(benh): We should really put a timeout on how long we
+ // keep tasks running on a slave that never have frameworks
+ // reregister and claim them.
+ LOG(WARNING) << "Possibly orphaned task " << task->task_id()
+ << " of framework " << task->framework_id()
+ << " running on slave " << *slave;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/a4a0d158/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index e97d213..37ce31a 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -335,13 +335,10 @@ protected:
void deactivate(Slave* slave);
// Add a slave.
- void addSlave(Slave* slave, bool reregister = false);
-
- void readdSlave(
+ void addSlave(
Slave* slave,
- const std::vector<ExecutorInfo>& executorInfos,
- const std::vector<Task>& tasks,
- const std::vector<Archive::Framework>& completedFrameworks);
+ const std::vector<Archive::Framework>& completedFrameworks =
+ std::vector<Archive::Framework>());
// Remove the slave from the registrar and from the master's state.
void removeSlave(Slave* slave);
@@ -820,7 +817,11 @@ struct Slave
Slave(const SlaveInfo& _info,
const process::UPID& _pid,
const Option<std::string> _version,
- const process::Time& _registeredTime)
+ const process::Time& _registeredTime,
+ const std::vector<ExecutorInfo> executorInfos =
+ std::vector<ExecutorInfo>(),
+ const std::vector<Task> tasks =
+ std::vector<Task>())
: id(_info.id()),
info(_info),
pid(_pid),
@@ -831,6 +832,15 @@ struct Slave
observer(NULL)
{
CHECK(_info.has_id());
+
+ foreach (const ExecutorInfo& executorInfo, executorInfos) {
+ CHECK(executorInfo.has_framework_id());
+ addExecutor(executorInfo.framework_id(), executorInfo);
+ }
+
+ foreach (const Task& task, tasks) {
+ addTask(new Task(task));
+ }
}
~Slave() {}
[3/8] git commit: Eliminated redundant resource accounting in the
master.
Posted by bm...@apache.org.
Eliminated redundant resource accounting in the master.
Review: https://reviews.apache.org/r/26199
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e1befdce
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e1befdce
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e1befdce
Branch: refs/heads/master
Commit: e1befdcee8edbab4ccb33d139f714b7e0e954dd8
Parents: c866484
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Fri Sep 26 16:19:09 2014 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Oct 8 11:45:11 2014 -0700
----------------------------------------------------------------------
src/master/http.cpp | 8 ++++---
src/master/master.cpp | 24 ++++++-------------
src/master/master.hpp | 60 +++++++++++++++++++++++++++++-----------------
3 files changed, 50 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/e1befdce/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 3fd4b45..a5e34cc 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -415,9 +415,11 @@ Future<Response> Master::Http::stats(const Request& request)
totalResources += resource;
}
}
- foreach (const Resource& resource, slave->usedResources) {
- if (resource.type() == Value::SCALAR) {
- usedResources += resource;
+ foreachvalue (const Resources& resources, slave->usedResources) {
+ foreach (const Resource& resource, resources) {
+ if (resource.type() == Value::SCALAR) {
+ usedResources += resource;
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/e1befdce/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index e2c58d1..79588da 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -4212,8 +4212,7 @@ void Master::addSlave(Slave* slave, bool reregister)
spawn(slave->observer);
if (!reregister) {
- allocator->slaveAdded(
- slave->id, slave->info, hashmap<FrameworkID, Resources>());
+ allocator->slaveAdded(slave->id, slave->info, slave->usedResources);
}
}
@@ -4228,10 +4227,6 @@ void Master::readdSlave(
addSlave(slave, true);
- // Add the executors and tasks to the slave and framework state and
- // determine the resources that have been allocated to frameworks.
- hashmap<FrameworkID, Resources> resources;
-
foreach (const ExecutorInfo& executorInfo, executorInfos) {
// TODO(bmahler): ExecutorInfo.framework_id is set by the Scheduler
// Driver in 0.14.0. Therefore, in 0.15.0, the slave no longer needs
@@ -4246,8 +4241,6 @@ void Master::readdSlave(
if (framework != NULL) { // The framework might not be re-registered yet.
framework->addExecutor(slave->id, executorInfo);
}
-
- resources[executorInfo.framework_id()] += executorInfo.resources();
}
foreach (const Task& task, tasks) {
@@ -4267,11 +4260,6 @@ void Master::readdSlave(
<< " of framework " << task.framework_id()
<< " running on slave " << *slave;
}
-
- // Terminal tasks do not consume resoures.
- if (!protobuf::isTerminalState(task.state())) {
- resources[task.framework_id()] += task.resources();
- }
}
// Re-add completed tasks reported by the slave.
@@ -4300,7 +4288,7 @@ void Master::readdSlave(
}
}
- allocator->slaveAdded(slave->id, slave->info, resources);
+ allocator->slaveAdded(slave->id, slave->info, slave->usedResources);
}
@@ -5146,9 +5134,11 @@ double Master::_resources_used(const std::string& name)
double used = 0.0;
foreachvalue (Slave* slave, slaves.registered) {
- foreach (const Resource& resource, slave->usedResources) {
- if (resource.name() == name && resource.type() == Value::SCALAR) {
- used += resource.scalar().value();
+ foreachvalue (const Resources& resources, slave->usedResources) {
+ foreach (const Resource& resource, resources) {
+ if (resource.name() == name && resource.type() == Value::SCALAR) {
+ used += resource.scalar().value();
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/e1befdce/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 5c0f224..5cafae3 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -838,17 +838,19 @@ struct Slave
void addTask(Task* task)
{
- CHECK(!tasks[task->framework_id()].contains(task->task_id()))
- << "Duplicate task " << task->task_id()
- << " of framework " << task->framework_id();
+ const TaskID& taskId = task->task_id();
+ const FrameworkID& frameworkId = task->framework_id();
+
+ CHECK(!tasks[frameworkId].contains(taskId))
+ << "Duplicate task " << taskId << " of framework " << frameworkId;
- tasks[task->framework_id()][task->task_id()] = task;
+ tasks[frameworkId][taskId] = task;
if (!protobuf::isTerminalState(task->state())) {
- usedResources += task->resources();
+ usedResources[frameworkId] += task->resources();
}
- LOG(INFO) << "Adding task " << task->task_id()
+ LOG(INFO) << "Adding task " << taskId
<< " with resources " << task->resources()
<< " on slave " << id << " (" << info.hostname() << ")";
}
@@ -859,30 +861,40 @@ struct Slave
// functionally for all tasks is expensive, for now.
void taskTerminated(Task* task)
{
+ const TaskID& taskId = task->task_id();
+ const FrameworkID& frameworkId = task->framework_id();
+
CHECK(protobuf::isTerminalState(task->state()));
- CHECK(tasks[task->framework_id()].contains(task->task_id()))
- << "Unknown task " << task->task_id()
- << " of framework " << task->framework_id();
+ CHECK(tasks[frameworkId].contains(taskId))
+ << "Unknown task " << taskId << " of framework " << frameworkId;
- usedResources -= task->resources();
+ usedResources[frameworkId] -= task->resources();
+ if (!tasks.contains(frameworkId) && !executors.contains(frameworkId)) {
+ usedResources.erase(frameworkId);
+ }
}
void removeTask(Task* task)
{
- CHECK(tasks[task->framework_id()].contains(task->task_id()))
- << "Unknown task " << task->task_id()
- << " of framework " << task->framework_id();
+ const TaskID& taskId = task->task_id();
+ const FrameworkID& frameworkId = task->framework_id();
+
+ CHECK(tasks[frameworkId].contains(taskId))
+ << "Unknown task " << taskId << " of framework " << frameworkId;
if (!protobuf::isTerminalState(task->state())) {
- usedResources -= task->resources();
+ usedResources[frameworkId] -= task->resources();
+ if (!tasks.contains(frameworkId) && !executors.contains(frameworkId)) {
+ usedResources.erase(frameworkId);
+ }
}
- tasks[task->framework_id()].erase(task->task_id());
- if (tasks[task->framework_id()].empty()) {
- tasks.erase(task->framework_id());
+ tasks[frameworkId].erase(taskId);
+ if (tasks[frameworkId].empty()) {
+ tasks.erase(frameworkId);
}
- killedTasks.remove(task->framework_id(), task->task_id());
+ killedTasks.remove(frameworkId, taskId);
}
void addOffer(Offer* offer)
@@ -916,7 +928,7 @@ struct Slave
<< " of framework " << frameworkId;
executors[frameworkId][executorInfo.executor_id()] = executorInfo;
- usedResources += executorInfo.resources();
+ usedResources[frameworkId] += executorInfo.resources();
}
void removeExecutor(const FrameworkID& frameworkId,
@@ -925,7 +937,11 @@ struct Slave
CHECK(hasExecutor(frameworkId, executorId))
<< "Unknown executor " << executorId << " of framework " << frameworkId;
- usedResources -= executors[frameworkId][executorId].resources();
+ usedResources[frameworkId] -=
+ executors[frameworkId][executorId].resources();
+
+ // XXX Remove.
+
executors[frameworkId].erase(executorId);
if (executors[frameworkId].empty()) {
executors.erase(frameworkId);
@@ -964,8 +980,8 @@ struct Slave
// Active offers on this slave.
hashset<Offer*> offers;
- Resources usedResources; // Active task / executor resources.
- Resources offeredResources; // Offered resources.
+ hashmap<FrameworkID, Resources> usedResources; // Active task / executors.
+ Resources offeredResources; // Offers.
SlaveObserver* observer;
[4/8] git commit: Properly deprecated
ReregisterSlaveMessage::slave_id.
Posted by bm...@apache.org.
Properly deprecated ReregisterSlaveMessage::slave_id.
Review: https://reviews.apache.org/r/26201
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/5da57a7e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/5da57a7e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/5da57a7e
Branch: refs/heads/master
Commit: 5da57a7ef769d12bfbdb64f365b409853fdd9935
Parents: 8f4dd63
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Thu Sep 25 16:59:19 2014 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Oct 8 11:45:11 2014 -0700
----------------------------------------------------------------------
src/master/master.cpp | 9 +++------
src/master/master.hpp | 9 +++++----
src/messages/messages.proto | 7 ++++---
src/slave/slave.cpp | 4 ++--
4 files changed, 14 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/5da57a7e/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index a1716f4..eb7b210 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -583,7 +583,6 @@ void Master::initialize()
install<ReregisterSlaveMessage>(
&Master::reregisterSlave,
- &ReregisterSlaveMessage::slave_id,
&ReregisterSlaveMessage::slave,
&ReregisterSlaveMessage::executor_infos,
&ReregisterSlaveMessage::tasks,
@@ -2981,7 +2980,7 @@ void Master::_registerSlave(
stringify(slaveInfo.id()));
send(pid, message);
} else {
- Slave* slave = new Slave(slaveInfo, slaveInfo.id(), pid, Clock::now());
+ Slave* slave = new Slave(slaveInfo, pid, Clock::now());
LOG(INFO) << "Registered slave " << *slave;
++metrics.slave_registrations;
@@ -2993,7 +2992,6 @@ void Master::_registerSlave(
void Master::reregisterSlave(
const UPID& from,
- const SlaveID& slaveId,
const SlaveInfo& slaveInfo,
const vector<ExecutorInfo>& executorInfos,
const vector<Task>& tasks,
@@ -3009,7 +3007,6 @@ void Master::reregisterSlave(
.onReady(defer(self(),
&Self::reregisterSlave,
from,
- slaveId,
slaveInfo,
executorInfos,
tasks,
@@ -3036,7 +3033,7 @@ void Master::reregisterSlave(
// re-registering. This is because a non-strict registrar cannot
// enforce this. We've already told frameworks the tasks were
// lost so it's important to deny the slave from re-registering.
- LOG(WARNING) << "Slave " << slaveId << " at " << from
+ LOG(WARNING) << "Slave " << slaveInfo.id() << " at " << from
<< " (" << slaveInfo.hostname() << ") attempted to "
<< "re-register after removal; shutting it down";
@@ -3171,7 +3168,7 @@ void Master::_reregisterSlave(
send(pid, message);
} else {
// Re-admission succeeded.
- Slave* slave = new Slave(slaveInfo, slaveInfo.id(), pid, Clock::now());
+ Slave* slave = new Slave(slaveInfo, pid, Clock::now());
slave->reregisteredTime = Clock::now();
LOG(INFO) << "Re-registered slave " << *slave;
http://git-wip-us.apache.org/repos/asf/mesos/blob/5da57a7e/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 5cafae3..0f0b205 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -160,7 +160,6 @@ public:
const SlaveInfo& slaveInfo);
void reregisterSlave(
const process::UPID& from,
- const SlaveID& slaveId,
const SlaveInfo& slaveInfo,
const std::vector<ExecutorInfo>& executorInfos,
const std::vector<Task>& tasks,
@@ -815,16 +814,18 @@ private:
struct Slave
{
Slave(const SlaveInfo& _info,
- const SlaveID& _id,
const process::UPID& _pid,
const process::Time& time)
- : id(_id),
+ : id(_info.id()),
info(_info),
pid(_pid),
registeredTime(time),
connected(true),
active(true),
- observer(NULL) {}
+ observer(NULL)
+ {
+ CHECK(_info.has_id());
+ }
~Slave() {}
http://git-wip-us.apache.org/repos/asf/mesos/blob/5da57a7e/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index 9ff06b3..16d9d67 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -231,9 +231,10 @@ message RegisterSlaveMessage {
message ReregisterSlaveMessage {
- // TODO(bmahler): Deprecate and remove the explicit slave_id as
- // SlaveInfo already includes this information.
- required SlaveID slave_id = 1;
+ // TODO(bmahler): slave_id is deprecated.
+ // 0.21.0: Now an optional field. Always written, never read.
+ // 0.22.0: Remove this field.
+ optional SlaveID slave_id = 1;
required SlaveInfo slave = 2;
repeated ExecutorInfo executor_infos = 4;
repeated Task tasks = 3;
http://git-wip-us.apache.org/repos/asf/mesos/blob/5da57a7e/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 853c350..ee3d649 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -879,7 +879,7 @@ void Slave::doReliableRegistration(const Duration& duration)
CHECK(state == DISCONNECTED) << state;
- if (info.id() == "") {
+ if (!info.has_id()) {
// Registering for the first time.
RegisterSlaveMessage message;
message.mutable_slave()->CopyFrom(info);
@@ -887,7 +887,7 @@ void Slave::doReliableRegistration(const Duration& duration)
} else {
// Re-registering, so send tasks running.
ReregisterSlaveMessage message;
- // TODO(bmahler): deprecate this.
+ // TODO(bmahler): Remove in 0.22.0.
message.mutable_slave_id()->CopyFrom(info.id());
message.mutable_slave()->CopyFrom(info);
[5/8] git commit: Fixed a log line in the master.
Posted by bm...@apache.org.
Fixed a log line in the master.
Review: https://reviews.apache.org/r/26198
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c8664847
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c8664847
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c8664847
Branch: refs/heads/master
Commit: c86648479476c556b9fb2f5a710af258991ba6e7
Parents: 7c7f8dd
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Mon Sep 29 12:23:12 2014 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Oct 8 11:45:11 2014 -0700
----------------------------------------------------------------------
src/master/master.cpp | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/c8664847/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 86dc544..e2c58d1 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -3929,7 +3929,7 @@ void Master::reconcile(
void Master::addFramework(Framework* framework)
{
CHECK(!frameworks.registered.contains(framework->id))
- << "Framework " << framework->id << "already exists!";
+ << "Framework " << framework->id << " already exists!";
frameworks.registered[framework->id] = framework;