You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2013/09/04 09:09:28 UTC
[1/6] git commit: Fixed the State output operators.
Updated Branches:
refs/heads/master 1c04cd4ee -> 4954b75f6
Fixed the State output operators.
Review: https://reviews.apache.org/r/13954
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3683ab68
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3683ab68
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3683ab68
Branch: refs/heads/master
Commit: 3683ab68d5979964db9afbdd3b47f452c6eda3ae
Parents: 1c04cd4
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Tue Sep 3 17:12:49 2013 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Tue Sep 3 22:44:10 2013 -0700
----------------------------------------------------------------------
src/slave/slave.cpp | 35 ++++++++++++++++++++++-------------
src/slave/slave.hpp | 5 +++++
2 files changed, 27 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/3683ab68/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 7f23b56..cefb420 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -662,6 +662,12 @@ void Slave::reregistered(const SlaveID& slaveId)
LOG(WARNING) << "Ignoring re-registration because slave is terminating";
break;
case RECOVERING:
+ // It's possible to receive a message intended for the previous
+ // run of the slave here. Short term we can leave this as is and
+ // crash in this case. Ideally responses can be tied to a
+ // particular run of the slave, see:
+ // https://issues.apache.org/jira/browse/MESOS-676
+ // https://issues.apache.org/jira/browse/MESOS-677
default:
LOG(FATAL) << "Unexpected slave state " << state;
break;
@@ -3208,7 +3214,20 @@ bool Executor::incompleteTasks()
}
-std::ostream& operator << (std::ostream& stream, Framework::State state) {
+std::ostream& operator << (std::ostream& stream, Slave::State state)
+{
+ switch (state) {
+ case Slave::RECOVERING: return stream << "RECOVERING";
+ case Slave::DISCONNECTED: return stream << "DISCONNECTED";
+ case Slave::RUNNING: return stream << "RUNNING";
+ case Slave::TERMINATING: return stream << "TERMINATING";
+ default: return stream << "UNKNOWN";
+ }
+}
+
+
+std::ostream& operator << (std::ostream& stream, Framework::State state)
+{
switch (state) {
case Framework::RUNNING: return stream << "RUNNING";
case Framework::TERMINATING: return stream << "TERMINATING";
@@ -3217,7 +3236,8 @@ std::ostream& operator << (std::ostream& stream, Framework::State state) {
}
-std::ostream& operator << (std::ostream& stream, Executor::State state) {
+std::ostream& operator << (std::ostream& stream, Executor::State state)
+{
switch (state) {
case Executor::REGISTERING: return stream << "REGISTERING";
case Executor::RUNNING: return stream << "RUNNING";
@@ -3227,17 +3247,6 @@ std::ostream& operator << (std::ostream& stream, Executor::State state) {
}
}
-
-std::ostream& operator << (std::ostream& stream, Slave::State state) {
- switch (state) {
- case Slave::RECOVERING: return stream << "RECOVERING";
- case Slave::DISCONNECTED: return stream << "DISCONNECTED";
- case Slave::RUNNING: return stream << "RUNNING";
- case Slave::TERMINATING: return stream << "TERMINATING";
- default: return stream << "UNKNOWN";
- }
-}
-
} // namespace slave {
} // namespace internal {
} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/3683ab68/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index ce2b0da..22fb74b 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -447,6 +447,11 @@ private:
Framework& operator = (const Framework&); // No assigning.
};
+
+std::ostream& operator << (std::ostream& stream, Slave::State state);
+std::ostream& operator << (std::ostream& stream, Framework::State state);
+std::ostream& operator << (std::ostream& stream, Executor::State state);
+
} // namespace slave {
} // namespace internal {
} // namespace mesos {
[3/6] git commit: Fixed a bug in reconciliation that failed to
account for unknown executors.
Posted by bm...@apache.org.
Fixed a bug in reconciliation that failed to account for unknown
executors.
Review: https://reviews.apache.org/r/13921
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ba9d843d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ba9d843d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ba9d843d
Branch: refs/heads/master
Commit: ba9d843daedcd559eb8a19137d159641b620740d
Parents: dc2ab2f
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Fri Aug 30 15:30:59 2013 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Tue Sep 3 23:06:40 2013 -0700
----------------------------------------------------------------------
src/master/master.cpp | 75 +++++++++++++++++++++++++++++++++++-----------
src/master/master.hpp | 8 +++--
2 files changed, 62 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/ba9d843d/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index a2ffe7f..30abe9d 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1155,7 +1155,7 @@ void Master::reregisterSlave(const SlaveID& slaveId,
// Reconcile tasks between master and the slave.
// NOTE: This needs to be done after the registration message is
// sent to the slave and the new pid is linked.
- reconcileTasks(slave, tasks);
+ reconcile(slave, executorInfos, tasks);
} else {
// NOTE: This handles the case when the slave tries to
// re-register with a failed over master.
@@ -1820,7 +1820,10 @@ Resources Master::launchTask(const TaskInfo& task,
// NOTE: This function is only called when the slave re-registers
// with a master that already knows about it (i.e., not a failed
// over master).
-void Master::reconcileTasks(Slave* slave, const vector<Task>& tasks)
+void Master::reconcile(
+ Slave* slave,
+ const vector<ExecutorInfo>& executors,
+ const vector<Task>& tasks)
{
CHECK_NOTNULL(slave);
@@ -1839,7 +1842,8 @@ void Master::reconcileTasks(Slave* slave, const vector<Task>& tasks)
if (!slaveTasks.contains(task->framework_id(), task->task_id())) {
LOG(WARNING) << "Sending TASK_LOST for task " << task->task_id()
<< " of framework " << task->framework_id()
- << " unknown to the slave " << slave->id;
+ << " unknown to the slave " << slave->id
+ << " (" << slave->info.hostname() << ")";
const StatusUpdate& update = protobuf::createStatusUpdate(
task->framework_id(),
@@ -1852,6 +1856,53 @@ void Master::reconcileTasks(Slave* slave, const vector<Task>& tasks)
}
}
+ // Likewise, any executors that are present in the master but
+ // not present in the slave must be removed to correctly account
+ // for resources. First we index the executors for fast lookup below.
+ multihashmap<FrameworkID, ExecutorID> slaveExecutors;
+ foreach (const ExecutorInfo& executor, executors) {
+ // TODO(bmahler): The slave ensures the framework id is set in the
+ // framework info when re-registering. This can be killed in 0.15.0
+ // as we've added code in 0.14.0 to ensure the framework id is set
+ // in the scheduler driver.
+ if (!executor.has_framework_id()) {
+ LOG(ERROR) << "Slave " << slave->id
+ << " (" << slave->info.hostname() << ") "
+ << "re-registered with executor " << executor.executor_id()
+ << " without setting the framework id";
+ continue;
+ }
+ slaveExecutors.put(executor.framework_id(), executor.executor_id());
+ }
+
+ // Now that we have the index for lookup, remove all the executors
+ // in the master that are not known to the slave.
+ foreachkey (const FrameworkID& frameworkId, utils::copy(slave->executors)) {
+ foreachkey (const ExecutorID& executorId,
+ utils::copy(slave->executors[frameworkId])) {
+ if (!slaveExecutors.contains(frameworkId, executorId)) {
+ LOG(WARNING) << "Removing executor " << executorId << " of framework "
+ << frameworkId << " as it is unknown to the slave "
+ << slave->id << " (" << slave->info.hostname() << ")";
+
+ // TODO(bmahler): This is duplicated in several locations, we
+ // may benefit from a method for removing an executor from
+ // all the relevant data structures and the allocator, akin
+ // to removeTask().
+ allocator->resourcesRecovered(
+ frameworkId,
+ slave->id,
+ slave->executors[frameworkId][executorId].resources());
+
+ slave->removeExecutor(frameworkId, executorId);
+
+ if (frameworks.contains(frameworkId)) {
+ frameworks[frameworkId]->removeExecutor(slave->id, executorId);
+ }
+ }
+ }
+ }
+
// Send KillTaskMessages for tasks in 'killedTasks' that are
// still alive on the slave. This could happen if the slave
// did not receive KillTaskMessage because of a partition or
@@ -2319,31 +2370,19 @@ void Master::removeOffer(Offer* offer, bool rescind)
Framework* Master::getFramework(const FrameworkID& frameworkId)
{
- if (frameworks.count(frameworkId) > 0) {
- return frameworks[frameworkId];
- } else {
- return NULL;
- }
+ return frameworks.contains(frameworkId) ? frameworks[frameworkId] : NULL;
}
Slave* Master::getSlave(const SlaveID& slaveId)
{
- if (slaves.count(slaveId) > 0) {
- return slaves[slaveId];
- } else {
- return NULL;
- }
+ return slaves.contains(slaveId) ? slaves[slaveId] : NULL;
}
Offer* Master::getOffer(const OfferID& offerId)
{
- if (offers.count(offerId) > 0) {
- return offers[offerId];
- } else {
- return NULL;
- }
+ return offers.contains(offerId) ? offers[offerId] : NULL;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/ba9d843d/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 6bd8998..19be184 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -140,10 +140,12 @@ protected:
const std::vector<TaskInfo>& tasks,
const Filters& filters);
- // Reconciles a re-registering slave's tasks and sends TASK_LOST
- // updates for tasks known to the master but unknown to the slave.
- void reconcileTasks(
+ // Reconciles a re-registering slave's tasks / executors and sends
+ // TASK_LOST updates for tasks known to the master but unknown to
+ // the slave.
+ void reconcile(
Slave* slave,
+ const std::vector<ExecutorInfo>& executors,
const std::vector<Task>& tasks);
// Add a framework.
[4/6] git commit: Added a cgroups::memory namespace.
Posted by bm...@apache.org.
Added a cgroups::memory namespace.
Review: https://reviews.apache.org/r/13903
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ca4580e4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ca4580e4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ca4580e4
Branch: refs/heads/master
Commit: ca4580e44383ce17ccd679f4074ed22130d2bdfd
Parents: ba9d843
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Thu Aug 29 13:41:17 2013 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Tue Sep 3 23:08:01 2013 -0700
----------------------------------------------------------------------
src/linux/cgroups.cpp | 66 ++++++++++++++++++++
src/linux/cgroups.hpp | 33 ++++++++++
src/slave/cgroups_isolator.cpp | 119 ++++++++++++++++++------------------
3 files changed, 157 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/ca4580e4/src/linux/cgroups.cpp
----------------------------------------------------------------------
diff --git a/src/linux/cgroups.cpp b/src/linux/cgroups.cpp
index b97a89c..813dcb3 100644
--- a/src/linux/cgroups.cpp
+++ b/src/linux/cgroups.cpp
@@ -1798,4 +1798,70 @@ Try<hashmap<string, uint64_t> > stat(
return result;
}
+
+namespace memory {
+
+Try<Bytes> limit_in_bytes(const string& hierarchy, const string& cgroup)
+{
+ Try<string> read = cgroups::read(
+ hierarchy, cgroup, "memory.limit_in_bytes");
+
+ if (read.isError()) {
+ return Error(read.error());
+ }
+
+ return Bytes::parse(strings::trim(read.get()) + "B");
+}
+
+
+Try<Nothing> limit_in_bytes(
+ const string& hierarchy,
+ const string& cgroup,
+ const Bytes& limit)
+{
+ return cgroups::write(
+ hierarchy, cgroup, "memory.limit_in_bytes", stringify(limit.bytes()));
+}
+
+
+Try<Bytes> soft_limit_in_bytes(const string& hierarchy, const string& cgroup)
+{
+ Try<string> read = cgroups::read(
+ hierarchy, cgroup, "memory.soft_limit_in_bytes");
+
+ if (read.isError()) {
+ return Error(read.error());
+ }
+
+ return Bytes::parse(strings::trim(read.get()) + "B");
+}
+
+
+Try<Nothing> soft_limit_in_bytes(
+ const string& hierarchy,
+ const string& cgroup,
+ const Bytes& limit)
+{
+ return cgroups::write(
+ hierarchy,
+ cgroup,
+ "memory.soft_limit_in_bytes",
+ stringify(limit.bytes()));
+}
+
+
+Try<Bytes> usage_in_bytes(const string& hierarchy, const string& cgroup)
+{
+ Try<string> read = cgroups::read(
+ hierarchy, cgroup, "memory.usage_in_bytes");
+
+ if (read.isError()) {
+ return Error(read.error());
+ }
+
+ return Bytes::parse(strings::trim(read.get()) + "B");
+}
+
+} // namespace memory {
+
} // namespace cgroups {
http://git-wip-us.apache.org/repos/asf/mesos/blob/ca4580e4/src/linux/cgroups.hpp
----------------------------------------------------------------------
diff --git a/src/linux/cgroups.hpp b/src/linux/cgroups.hpp
index 3989712..5ee64d6 100644
--- a/src/linux/cgroups.hpp
+++ b/src/linux/cgroups.hpp
@@ -378,6 +378,39 @@ Try<hashmap<std::string, uint64_t> > stat(
const std::string& cgroup,
const std::string& file);
+
+// Memory controls.
+namespace memory {
+
+// Returns the memory limit from memory.limit_in_bytes.
+Try<Bytes> limit_in_bytes(
+ const std::string& hierarchy,
+ const std::string& cgroup);
+
+// Sets the memory limit using memory.limit_in_bytes.
+Try<Nothing> limit_in_bytes(
+ const std::string& hierarchy,
+ const std::string& cgroup,
+ const Bytes& limit);
+
+// Returns the soft memory limit from memory.soft_limit_in_bytes.
+Try<Bytes> soft_limit_in_bytes(
+ const std::string& hierarchy,
+ const std::string& cgroup);
+
+// Sets the soft memory limit using memory.soft_limit_in_bytes.
+Try<Nothing> soft_limit_in_bytes(
+ const std::string& hierarchy,
+ const std::string& cgroup,
+ const Bytes& limit);
+
+// Returns the memory usage from memory.usage_in_bytes.
+Try<Bytes> usage_in_bytes(
+ const std::string& hierarchy,
+ const std::string& cgroup);
+
+} // namespace memory {
+
} // namespace cgroups {
#endif // __CGROUPS_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/ca4580e4/src/slave/cgroups_isolator.cpp
----------------------------------------------------------------------
diff --git a/src/slave/cgroups_isolator.cpp b/src/slave/cgroups_isolator.cpp
index 676768e..d327d65 100644
--- a/src/slave/cgroups_isolator.cpp
+++ b/src/slave/cgroups_isolator.cpp
@@ -86,7 +86,7 @@ const Duration CPU_CFS_PERIOD = Milliseconds(100); // Linux default.
const Duration MIN_CPU_CFS_QUOTA = Milliseconds(1);
// Memory subsystem constants.
-const size_t MIN_MEMORY_MB = 32 * Megabyte;
+const Bytes MIN_MEMORY = Megabytes(32);
// This is an approximate double precision equality check.
@@ -1041,50 +1041,53 @@ Try<Nothing> CgroupsIsolator::memChanged(
return Error("Expecting resource 'mem' to be a scalar");
}
- double mem = resource.scalar().value();
- size_t limitInBytes =
- std::max((size_t) mem, MIN_MEMORY_MB) * 1024LL * 1024LL;
-
- // Determine which control to set. If this is the first time we're
- // setting the limit, use 'memory.limit_in_bytes'. The "first time"
- // is determined by checking whether or not we've forked a process
- // in the cgroup yet (i.e., 'info->pid.isSome()'). If this is not
- // the first time we're setting the limit AND we're decreasing the
- // limit, use 'memory.soft_limit_in_bytes'. We do this because we
- // might not be able to decrease 'memory.limit_in_bytes' if too much
- // memory is being used. This is probably okay if the machine has
- // available resources; TODO(benh): Introduce a MemoryWatcherProcess
- // which monitors the discrepancy between usage and soft limit and
- // introduces a "manual oom" if necessary.
- string control = "memory.limit_in_bytes";
-
- if (info->pid.isSome()) {
- Try<string> read = cgroups::read(
- hierarchy, info->name(), "memory.limit_in_bytes");
- if (read.isError()) {
- return Error(
- "Failed to read 'memory.limit_in_bytes': " + read.error());
- }
-
- Try<size_t> currentLimitInBytes = numify<size_t>(strings::trim(read.get()));
- CHECK_SOME(currentLimitInBytes);
-
- if (limitInBytes <= currentLimitInBytes.get()) {
- control = "memory.soft_limit_in_bytes";
- }
- }
+ Bytes mem = Bytes((uint64_t) resource.scalar().value() * 1024LL * 1024LL);
+ Bytes limit = std::max(mem, MIN_MEMORY);
- Try<Nothing> write = cgroups::write(
- hierarchy, info->name(), control, stringify(limitInBytes));
+ // Always set the soft limit.
+ Try<Nothing> write =
+ cgroups::memory::soft_limit_in_bytes(hierarchy, info->name(), limit);
if (write.isError()) {
- return Error("Failed to update '" + control + "': " + write.error());
+ return Error("Failed to set 'memory.soft_limit_in_bytes': "
+ + write.error());
}
- LOG(INFO) << "Updated '" << control << "' to " << limitInBytes
+ LOG(INFO) << "Updated 'memory.soft_limit_in_bytes' to " << limit
<< " for executor " << info->executorId
<< " of framework " << info->frameworkId;
+ // Read the existing limit.
+ Try<Bytes> currentLimit =
+ cgroups::memory::limit_in_bytes(hierarchy, info->name());
+
+ if (currentLimit.isError()) {
+ return Error(
+ "Failed to read 'memory.limit_in_bytes': " + currentLimit.error());
+ }
+
+ // Determine whether to set the hard limit. If this is the first
+ // time (info->pid.isNone()), or we're raising the existing limit,
+ // then we can update the hard limit safely. Otherwise, if we need
+ // to decrease 'memory.limit_in_bytes' we may induce an OOM if too
+ // much memory is in use. As a result, we only update the soft
+ // limit when the memory reservation is being reduced. This is
+ // probably okay if the machine has available resources.
+ // TODO(benh): Introduce a MemoryWatcherProcess which monitors the
+ // discrepancy between usage and soft limit and introduces a
+ // "manual oom" if necessary.
+ if (info->pid.isNone() || limit > currentLimit.get()) {
+ write = cgroups::memory::limit_in_bytes(hierarchy, info->name(), limit);
+
+ if (write.isError()) {
+ return Error("Failed to set 'memory.limit_in_bytes': " + write.error());
+ }
+
+ LOG(INFO) << "Updated 'memory.limit_in_bytes' to " << limit
+ << " for executor " << info->executorId
+ << " of framework " << info->frameworkId;
+ }
+
return Nothing();
}
@@ -1181,35 +1184,29 @@ void CgroupsIsolator::oom(
ostringstream message;
message << "Memory limit exceeded: ";
- Try<string> read = cgroups::read(
- hierarchy, info->name(), "memory.limit_in_bytes");
- if (read.isSome()) {
- Try<uint64_t> bytes = numify<uint64_t>(strings::trim(read.get()));
- if (bytes.isError()) {
- LOG(ERROR)
- << "Failed to numify 'memory.limit_in_bytes': " << bytes.error();
- message << "Requested: " << strings::trim(read.get()) << " bytes ";
- } else {
- message << "Requested: " << Bytes(bytes.get()) << " ";
- }
+ // Output the requested memory limit.
+ Try<Bytes> limit = cgroups::memory::limit_in_bytes(hierarchy, info->name());
+
+ if (limit.isError()) {
+ LOG(ERROR) << "Failed to read 'memory.limit_in_bytes': " << limit.error();
+ } else {
+ message << "Requested: " << limit.get() << " ";
}
- // Output 'memory.usage_in_bytes'.
- read = cgroups::read(hierarchy, info->name(), "memory.usage_in_bytes");
- if (read.isSome()) {
- Try<uint64_t> bytes = numify<uint64_t>(strings::trim(read.get()));
- if (bytes.isError()) {
- LOG(ERROR)
- << "Failed to numify 'memory.usage_in_bytes': " << bytes.error();
- message << "Used: " << strings::trim(read.get()) << " bytes\n";
- } else {
- message << "Used: " << Bytes(bytes.get()) << "\n";
- }
+ // Output the memory usage.
+ Try<Bytes> usage = cgroups::memory::usage_in_bytes(hierarchy, info->name());
+
+ if (usage.isError()) {
+ LOG(ERROR) << "Failed to read 'memory.usage_in_bytes': " << usage.error();
+ } else {
+ message << "Used: " << usage.get() << "\n";
}
// Output 'memory.stat' of the cgroup to help with debugging.
- read = cgroups::read(hierarchy, info->name(), "memory.stat");
- if (read.isSome()) {
+ Try<string> read = cgroups::read(hierarchy, info->name(), "memory.stat");
+ if (read.isError()) {
+ LOG(ERROR) << "Failed to read 'memory.stat': " << read.error();
+ } else {
message << "\nMEMORY STATISTICS: \n" << read.get() << "\n";
}
[2/6] git commit: Fixed a CHECK failure in the master when successive
exited events occur for a disconnected slave.
Posted by bm...@apache.org.
Fixed a CHECK failure in the master when successive exited events
occur for a disconnected slave.
Review: https://reviews.apache.org/r/13956
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/dc2ab2ff
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/dc2ab2ff
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/dc2ab2ff
Branch: refs/heads/master
Commit: dc2ab2ff4b868c479701fcf6aa2d43903e32b95c
Parents: 3683ab6
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Tue Sep 3 17:53:03 2013 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Tue Sep 3 22:48:43 2013 -0700
----------------------------------------------------------------------
src/master/master.cpp | 16 +++++++++++-----
1 file changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/dc2ab2ff/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 874d7fa..a2ffe7f 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -557,11 +557,7 @@ void Master::exited(const UPID& pid)
<< "because it is not checkpointing!";
removeSlave(slave);
return;
- } else {
- CHECK(!slave->disconnected)
- << "Slave " << slave->id << " ("
- << slave->info.hostname() << ") already disconnected!" ;
-
+ } else if (!slave->disconnected) {
// Mark the slave as disconnected and remove it from the allocator.
slave->disconnected = true;
@@ -601,6 +597,10 @@ void Master::exited(const UPID& pid)
// Remove and rescind offers.
removeOffer(offer, true); // Rescind!
}
+ } else {
+ LOG(WARNING) << "Ignoring duplicate exited() notification for "
+ << "checkpointing slave " << slave->id
+ << " (" << slave->info.hostname() << ")";
}
}
}
@@ -1143,6 +1143,12 @@ void Master::reregisterSlave(const SlaveID& slaveId,
reply(message);
// Update the slave pid and relink to it.
+ // NOTE: Re-linking the slave here always rather than only when
+ // the slave is disconnected can lead to multiple exited events
+ // in succession for a disconnected slave. As a result, we
+ // ignore duplicate exited events for disconnected checkpointing
+ // slaves.
+ // See: https://issues.apache.org/jira/browse/MESOS-675
slave->pid = from;
link(slave->pid);
[6/6] git commit: Exposing cpu.stat statistics in the monitoring
endpoint.
Posted by bm...@apache.org.
Exposing cpu.stat statistics in the monitoring endpoint.
From: Christina Delimitrou <ch...@gmail.com>
Review: https://reviews.apache.org/r/13868
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4954b75f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4954b75f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4954b75f
Branch: refs/heads/master
Commit: 4954b75f6fbe2d23fb3e11489441036bd3676017
Parents: 189cefe
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Tue Sep 3 23:12:55 2013 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Sep 4 00:09:09 2013 -0700
----------------------------------------------------------------------
include/mesos/mesos.proto | 5 +++++
src/slave/cgroups_isolator.cpp | 23 +++++++++++++++++++++
src/slave/constants.cpp | 2 +-
src/slave/monitor.cpp | 41 +++++++++++++++++++++++++++++++++++++
src/tests/monitor_tests.cpp | 13 ++++++++++--
5 files changed, 81 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/4954b75f/include/mesos/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index bbf1d31..8f845cc 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -274,6 +274,11 @@ message ResourceStatistics {
// Number of CPUs allocated.
required double cpus_limit = 4;
+ // cpu.stat on process throttling (for contention issues).
+ optional uint32 cpus_nr_periods = 7;
+ optional uint32 cpus_nr_throttled = 8;
+ optional double cpus_throttled_time_secs = 9;
+
// Memory Usage Information:
optional uint64 mem_rss_bytes = 5; // Resident Set Size.
http://git-wip-us.apache.org/repos/asf/mesos/blob/4954b75f/src/slave/cgroups_isolator.cpp
----------------------------------------------------------------------
diff --git a/src/slave/cgroups_isolator.cpp b/src/slave/cgroups_isolator.cpp
index ec9f8ec..a1f5b32 100644
--- a/src/slave/cgroups_isolator.cpp
+++ b/src/slave/cgroups_isolator.cpp
@@ -782,6 +782,29 @@ Future<ResourceStatistics> CgroupsIsolator::usage(
result.set_mem_rss_bytes(stat.get()["rss"]);
}
+ // Add the cpu.stat information.
+ stat = cgroups::stat(hierarchy, info->name(), "cpu.stat");
+
+ if (stat.isError()) {
+ return Future<ResourceStatistics>::failed(
+ "Failed to read cpu.stat: " + stat.error());
+ }
+
+ if (stat.get().contains("nr_periods")) {
+ result.set_cpus_nr_periods(
+ (uint32_t) stat.get()["nr_periods"]);
+ }
+
+ if (stat.get().contains("nr_throttled")) {
+ result.set_cpus_nr_throttled(
+ (uint32_t) stat.get()["nr_throttled"]);
+ }
+
+ if (stat.get().contains("throttled_time")) {
+ result.set_cpus_throttled_time_secs(
+ Nanoseconds(stat.get()["throttled_time"]).secs());
+ }
+
return result;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/4954b75f/src/slave/constants.cpp
----------------------------------------------------------------------
diff --git a/src/slave/constants.cpp b/src/slave/constants.cpp
index 8c74c00..5573d39 100644
--- a/src/slave/constants.cpp
+++ b/src/slave/constants.cpp
@@ -30,7 +30,7 @@ const Duration GC_DELAY = Weeks(1);
const double GC_DISK_HEADROOM = 0.1;
const Duration DISK_WATCH_INTERVAL = Minutes(1);
const Duration RECOVERY_TIMEOUT = Minutes(15);
-const Duration RESOURCE_MONITORING_INTERVAL = Seconds(5);
+const Duration RESOURCE_MONITORING_INTERVAL = Seconds(1);
const uint32_t MAX_COMPLETED_FRAMEWORKS = 50;
const uint32_t MAX_COMPLETED_EXECUTORS_PER_FRAMEWORK = 150;
const uint32_t MAX_COMPLETED_TASKS_PER_EXECUTOR = 200;
http://git-wip-us.apache.org/repos/asf/mesos/blob/4954b75f/src/slave/monitor.cpp
----------------------------------------------------------------------
diff --git a/src/slave/monitor.cpp b/src/slave/monitor.cpp
index 8e1eb35..9cb6256 100644
--- a/src/slave/monitor.cpp
+++ b/src/slave/monitor.cpp
@@ -52,12 +52,16 @@ using process::wait; // Necessary on some OS's to disambiguate.
// These match the names in the ResourceStatistics protobuf.
// TODO(bmahler): Later, when we have a richer monitoring story,
// we will want to publish these outside of this file.
+// TODO(cdel): Check if we need any more of the cgroup stats.
const std::string CPUS_TIME_SECS = "cpus_time_secs";
const std::string CPUS_USER_TIME_SECS = "cpus_user_time_secs";
const std::string CPUS_SYSTEM_TIME_SECS = "cpus_system_time_secs";
const std::string CPUS_LIMIT = "cpus_limit";
const std::string MEM_RSS_BYTES = "mem_rss_bytes";
const std::string MEM_LIMIT_BYTES = "mem_limit_bytes";
+const std::string CPUS_NR_PERIODS = "cpus_nr_periods";
+const std::string CPUS_NR_THROTTLED = "cpus_nr_throttled";
+const std::string CPUS_THROTTLED_TIME_SECS = "cpus_throttled_time_secs";
// TODO(bmahler): Deprecated statistical names, these will be removed!
const std::string CPU_TIME = "cpu_time";
@@ -126,6 +130,9 @@ Future<Nothing> ResourceMonitorProcess::unwatch(
::statistics->archive("monitor", prefix + CPUS_LIMIT);
::statistics->archive("monitor", prefix + MEM_RSS_BYTES);
::statistics->archive("monitor", prefix + MEM_LIMIT_BYTES);
+ ::statistics->archive("monitor", prefix + CPUS_NR_PERIODS);
+ ::statistics->archive("monitor", prefix + CPUS_NR_THROTTLED);
+ ::statistics->archive("monitor", prefix + CPUS_THROTTLED_TIME_SECS);
if (!watches.contains(frameworkId) ||
!watches[frameworkId].contains(executorId)) {
@@ -248,6 +255,23 @@ void publish(
prefix + MEM_LIMIT_BYTES,
statistics.mem_limit_bytes(),
time);
+
+ // Publish cpu.stat statistics.
+ ::statistics->set(
+ "monitor",
+ prefix + CPUS_NR_PERIODS,
+ statistics.cpus_nr_periods(),
+ time);
+ ::statistics->set(
+ "monitor",
+ prefix + CPUS_NR_THROTTLED,
+ statistics.cpus_nr_throttled(),
+ time);
+ ::statistics->set(
+ "monitor",
+ prefix + CPUS_THROTTLED_TIME_SECS,
+ statistics.cpus_throttled_time_secs(),
+ time);
}
@@ -286,6 +310,9 @@ Future<http::Response> _statisticsJSON(
usage.values[CPUS_LIMIT] = 0;
usage.values[MEM_RSS_BYTES] = 0;
usage.values[MEM_LIMIT_BYTES] = 0;
+ usage.values[CPUS_NR_PERIODS] = 0;
+ usage.values[CPUS_NR_THROTTLED] = 0;
+ usage.values[CPUS_THROTTLED_TIME_SECS] = 0;
// Set the cpu usage data if present.
if (statistics.count(prefix + CPUS_USER_TIME_SECS) > 0) {
@@ -310,6 +337,20 @@ Future<http::Response> _statisticsJSON(
statistics.find(prefix + MEM_LIMIT_BYTES)->second;
}
+ // Set the cpu.stat data if present.
+ if (statistics.count(prefix + CPUS_NR_PERIODS) > 0) {
+ usage.values[CPUS_NR_PERIODS] =
+ statistics.find(prefix + CPUS_NR_PERIODS)->second;
+ }
+ if (statistics.count(prefix + CPUS_NR_THROTTLED) > 0) {
+ usage.values[CPUS_NR_THROTTLED] =
+ statistics.find(prefix + CPUS_NR_THROTTLED)->second;
+ }
+ if (statistics.count(prefix + CPUS_THROTTLED_TIME_SECS) > 0) {
+ usage.values[CPUS_THROTTLED_TIME_SECS] =
+ statistics.find(prefix + CPUS_THROTTLED_TIME_SECS)->second;
+ }
+
JSON::Object entry;
entry.values["framework_id"] = frameworkId.value();
entry.values["executor_id"] = executorId.value();
http://git-wip-us.apache.org/repos/asf/mesos/blob/4954b75f/src/tests/monitor_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/monitor_tests.cpp b/src/tests/monitor_tests.cpp
index 3142416..3d3f8af 100644
--- a/src/tests/monitor_tests.cpp
+++ b/src/tests/monitor_tests.cpp
@@ -75,15 +75,18 @@ TEST(MonitorTest, WatchUnwatch)
ResourceStatistics initialStatistics;
initialStatistics.set_cpus_user_time_secs(0);
initialStatistics.set_cpus_system_time_secs(0);
- initialStatistics.set_cpus_limit(1.0);
+ initialStatistics.set_cpus_limit(2.5);
initialStatistics.set_mem_rss_bytes(0);
initialStatistics.set_mem_limit_bytes(2048);
initialStatistics.set_timestamp(Clock::now().secs());
ResourceStatistics statistics;
+ statistics.set_cpus_nr_periods(100);
+ statistics.set_cpus_nr_throttled(2);
statistics.set_cpus_user_time_secs(4);
statistics.set_cpus_system_time_secs(1);
- statistics.set_cpus_limit(1.0);
+ statistics.set_cpus_throttled_time_secs(0.5);
+ statistics.set_cpus_limit(2.5);
statistics.set_mem_rss_bytes(1024);
statistics.set_mem_limit_bytes(2048);
statistics.set_timestamp(
@@ -183,14 +186,20 @@ TEST(MonitorTest, WatchUnwatch)
"\"source\":\"source\","
"\"statistics\":{"
"\"cpus_limit\":%g,"
+ "\"cpus_nr_periods\":%d,"
+ "\"cpus_nr_throttled\":%d,"
"\"cpus_system_time_secs\":%g,"
+ "\"cpus_throttled_time_secs\":%g,"
"\"cpus_user_time_secs\":%g,"
"\"mem_limit_bytes\":%lu,"
"\"mem_rss_bytes\":%lu"
"}"
"}]",
statistics.cpus_limit(),
+ statistics.cpus_nr_periods(),
+ statistics.cpus_nr_throttled(),
statistics.cpus_system_time_secs(),
+ statistics.cpus_throttled_time_secs(),
statistics.cpus_user_time_secs(),
statistics.mem_limit_bytes(),
statistics.mem_rss_bytes()).get(),
[5/6] git commit: Fixed CgroupsIsolator to listen for OOMs of
recovered executors.
Posted by bm...@apache.org.
Fixed CgroupsIsolator to listen for OOMs of recovered executors.
Review: https://reviews.apache.org/r/13904
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/189cefe3
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/189cefe3
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/189cefe3
Branch: refs/heads/master
Commit: 189cefe3a540ee448161773f6b6b88b8e451f151
Parents: ca4580e
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Thu Aug 29 13:56:05 2013 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Tue Sep 3 23:08:01 2013 -0700
----------------------------------------------------------------------
src/slave/cgroups_isolator.cpp | 18 ++++++++++++------
1 file changed, 12 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/189cefe3/src/slave/cgroups_isolator.cpp
----------------------------------------------------------------------
diff --git a/src/slave/cgroups_isolator.cpp b/src/slave/cgroups_isolator.cpp
index d327d65..ec9f8ec 100644
--- a/src/slave/cgroups_isolator.cpp
+++ b/src/slave/cgroups_isolator.cpp
@@ -526,7 +526,7 @@ void CgroupsIsolator::launchExecutor(
<< ": " << create.error();
}
- // Setup the initial resource constrains.
+ // Setup the initial resource constraints.
resourcesChanged(frameworkId, executorId, resources);
// Start listening on OOM events.
@@ -846,11 +846,7 @@ Future<Nothing> CgroupsIsolator::recover(
info->destroyed,
info->message);
- // We make a copy here because 'info' will be deleted when
- // we unregister.
- unregisterCgroupInfo(
- utils::copy(info->frameworkId),
- utils::copy(info->executorId));
+ unregisterCgroupInfo(framework.id, executor.id);
continue;
}
@@ -865,6 +861,16 @@ Future<Nothing> CgroupsIsolator::recover(
run.forkedPid.get(),
lambda::_1));
}
+
+ // Start listening for OOMs. If the executor OOMed while the
+ // slave was down or recovering, the cgroup will already be
+ // under_oom, resulting in immediate notification.
+ // TODO(bmahler): I've been unable to find documentation
+ // guaranteeing this, but the kernel source indicates they
+ // notify if already under_oom.
+ if (subsystems.contains("memory")) {
+ oomListen(framework.id, executor.id);
+ }
}
}
}