You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2013/09/04 09:09:28 UTC

[1/6] git commit: Fixed the State output operators.

Updated Branches:
  refs/heads/master 1c04cd4ee -> 4954b75f6


Fixed the State output operators.

Review: https://reviews.apache.org/r/13954


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3683ab68
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3683ab68
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3683ab68

Branch: refs/heads/master
Commit: 3683ab68d5979964db9afbdd3b47f452c6eda3ae
Parents: 1c04cd4
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Tue Sep 3 17:12:49 2013 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Tue Sep 3 22:44:10 2013 -0700

----------------------------------------------------------------------
 src/slave/slave.cpp | 35 ++++++++++++++++++++++-------------
 src/slave/slave.hpp |  5 +++++
 2 files changed, 27 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/3683ab68/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 7f23b56..cefb420 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -662,6 +662,12 @@ void Slave::reregistered(const SlaveID& slaveId)
       LOG(WARNING) << "Ignoring re-registration because slave is terminating";
       break;
     case RECOVERING:
+      // It's possible to receive a message intended for the previous
+      // run of the slave here. Short term we can leave this as is and
+      // crash in this case. Ideally responses can be tied to a
+      // particular run of the slave, see:
+      // https://issues.apache.org/jira/browse/MESOS-676
+      // https://issues.apache.org/jira/browse/MESOS-677
     default:
       LOG(FATAL) << "Unexpected slave state " << state;
       break;
@@ -3208,7 +3214,20 @@ bool Executor::incompleteTasks()
 }
 
 
-std::ostream& operator << (std::ostream& stream, Framework::State state) {
+std::ostream& operator << (std::ostream& stream, Slave::State state)
+{
+  switch (state) {
+    case Slave::RECOVERING:   return stream << "RECOVERING";
+    case Slave::DISCONNECTED: return stream << "DISCONNECTED";
+    case Slave::RUNNING:      return stream << "RUNNING";
+    case Slave::TERMINATING:  return stream << "TERMINATING";
+    default:                  return stream << "UNKNOWN";
+  }
+}
+
+
+std::ostream& operator << (std::ostream& stream, Framework::State state)
+{
   switch (state) {
     case Framework::RUNNING:     return stream << "RUNNING";
     case Framework::TERMINATING: return stream << "TERMINATING";
@@ -3217,7 +3236,8 @@ std::ostream& operator << (std::ostream& stream, Framework::State state) {
 }
 
 
-std::ostream& operator << (std::ostream& stream, Executor::State state) {
+std::ostream& operator << (std::ostream& stream, Executor::State state)
+{
   switch (state) {
     case Executor::REGISTERING: return stream << "REGISTERING";
     case Executor::RUNNING:     return stream << "RUNNING";
@@ -3227,17 +3247,6 @@ std::ostream& operator << (std::ostream& stream, Executor::State state) {
   }
 }
 
-
-std::ostream& operator << (std::ostream& stream, Slave::State state) {
-  switch (state) {
-    case Slave::RECOVERING:   return stream << "RECOVERING";
-    case Slave::DISCONNECTED: return stream << "DISCONNECTED";
-    case Slave::RUNNING:      return stream << "RUNNING";
-    case Slave::TERMINATING:  return stream << "TERMINATING";
-    default:                  return stream << "UNKNOWN";
-  }
-}
-
 } // namespace slave {
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/3683ab68/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index ce2b0da..22fb74b 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -447,6 +447,11 @@ private:
   Framework& operator = (const Framework&); // No assigning.
 };
 
+
+std::ostream& operator << (std::ostream& stream, Slave::State state);
+std::ostream& operator << (std::ostream& stream, Framework::State state);
+std::ostream& operator << (std::ostream& stream, Executor::State state);
+
 } // namespace slave {
 } // namespace internal {
 } // namespace mesos {


[3/6] git commit: Fixed a bug in reconciliation that failed to account for unknown executors.

Posted by bm...@apache.org.
Fixed a bug in reconciliation that failed to account for unknown
executors.

Review: https://reviews.apache.org/r/13921


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ba9d843d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ba9d843d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ba9d843d

Branch: refs/heads/master
Commit: ba9d843daedcd559eb8a19137d159641b620740d
Parents: dc2ab2f
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Fri Aug 30 15:30:59 2013 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Tue Sep 3 23:06:40 2013 -0700

----------------------------------------------------------------------
 src/master/master.cpp | 75 +++++++++++++++++++++++++++++++++++-----------
 src/master/master.hpp |  8 +++--
 2 files changed, 62 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/ba9d843d/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index a2ffe7f..30abe9d 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1155,7 +1155,7 @@ void Master::reregisterSlave(const SlaveID& slaveId,
       // Reconcile tasks between master and the slave.
       // NOTE: This needs to be done after the registration message is
       // sent to the slave and the new pid is linked.
-      reconcileTasks(slave, tasks);
+      reconcile(slave, executorInfos, tasks);
     } else {
       // NOTE: This handles the case when the slave tries to
       // re-register with a failed over master.
@@ -1820,7 +1820,10 @@ Resources Master::launchTask(const TaskInfo& task,
 // NOTE: This function is only called when the slave re-registers
 // with a master that already knows about it (i.e., not a failed
 // over master).
-void Master::reconcileTasks(Slave* slave, const vector<Task>& tasks)
+void Master::reconcile(
+    Slave* slave,
+    const vector<ExecutorInfo>& executors,
+    const vector<Task>& tasks)
 {
   CHECK_NOTNULL(slave);
 
@@ -1839,7 +1842,8 @@ void Master::reconcileTasks(Slave* slave, const vector<Task>& tasks)
     if (!slaveTasks.contains(task->framework_id(), task->task_id())) {
       LOG(WARNING) << "Sending TASK_LOST for task " << task->task_id()
                    << " of framework " << task->framework_id()
-                   << " unknown to the slave " << slave->id;
+                   << " unknown to the slave " << slave->id
+                   << " (" << slave->info.hostname() << ")";
 
       const StatusUpdate& update = protobuf::createStatusUpdate(
           task->framework_id(),
@@ -1852,6 +1856,53 @@ void Master::reconcileTasks(Slave* slave, const vector<Task>& tasks)
     }
   }
 
+  // Likewise, any executors that are present in the master but
+  // not present in the slave must be removed to correctly account
+  // for resources. First we index the executors for fast lookup below.
+  multihashmap<FrameworkID, ExecutorID> slaveExecutors;
+  foreach (const ExecutorInfo& executor, executors) {
+    // TODO(bmahler): The slave ensures the framework id is set in the
+    // framework info when re-registering. This can be killed in 0.15.0
+    // as we've added code in 0.14.0 to ensure the framework id is set
+    // in the scheduler driver.
+    if (!executor.has_framework_id()) {
+      LOG(ERROR) << "Slave " << slave->id
+                 << " (" << slave->info.hostname() << ") "
+                 << "re-registered with executor " << executor.executor_id()
+                 << " without setting the framework id";
+      continue;
+    }
+    slaveExecutors.put(executor.framework_id(), executor.executor_id());
+  }
+
+  // Now that we have the index for lookup, remove all the executors
+  // in the master that are not known to the slave.
+  foreachkey (const FrameworkID& frameworkId, utils::copy(slave->executors)) {
+    foreachkey (const ExecutorID& executorId,
+                utils::copy(slave->executors[frameworkId])) {
+      if (!slaveExecutors.contains(frameworkId, executorId)) {
+        LOG(WARNING) << "Removing executor " << executorId << " of framework "
+                     << frameworkId << " as it is unknown to the slave "
+                     << slave->id << " (" << slave->info.hostname() << ")";
+
+        // TODO(bmahler): This is duplicated in several locations, we
+        // may benefit from a method for removing an executor from
+        // all the relevant data structures and the allocator, akin
+        // to removeTask().
+        allocator->resourcesRecovered(
+            frameworkId,
+            slave->id,
+            slave->executors[frameworkId][executorId].resources());
+
+        slave->removeExecutor(frameworkId, executorId);
+
+        if (frameworks.contains(frameworkId)) {
+          frameworks[frameworkId]->removeExecutor(slave->id, executorId);
+        }
+      }
+    }
+  }
+
   // Send KillTaskMessages for tasks in 'killedTasks' that are
   // still alive on the slave. This could happen if the slave
   // did not receive KillTaskMessage because of a partition or
@@ -2319,31 +2370,19 @@ void Master::removeOffer(Offer* offer, bool rescind)
 
 Framework* Master::getFramework(const FrameworkID& frameworkId)
 {
-  if (frameworks.count(frameworkId) > 0) {
-    return frameworks[frameworkId];
-  } else {
-    return NULL;
-  }
+  return frameworks.contains(frameworkId) ? frameworks[frameworkId] : NULL;
 }
 
 
 Slave* Master::getSlave(const SlaveID& slaveId)
 {
-  if (slaves.count(slaveId) > 0) {
-    return slaves[slaveId];
-  } else {
-    return NULL;
-  }
+  return slaves.contains(slaveId) ? slaves[slaveId] : NULL;
 }
 
 
 Offer* Master::getOffer(const OfferID& offerId)
 {
-  if (offers.count(offerId) > 0) {
-    return offers[offerId];
-  } else {
-    return NULL;
-  }
+  return offers.contains(offerId) ? offers[offerId] : NULL;
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/ba9d843d/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 6bd8998..19be184 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -140,10 +140,12 @@ protected:
       const std::vector<TaskInfo>& tasks,
       const Filters& filters);
 
-  // Reconciles a re-registering slave's tasks and sends TASK_LOST
-  // updates for tasks known to the master but unknown to the slave.
-  void reconcileTasks(
+  // Reconciles a re-registering slave's tasks / executors and sends
+  // TASK_LOST updates for tasks known to the master but unknown to
+  // the slave.
+  void reconcile(
       Slave* slave,
+      const std::vector<ExecutorInfo>& executors,
       const std::vector<Task>& tasks);
 
   // Add a framework.


[4/6] git commit: Added a cgroups::memory namespace.

Posted by bm...@apache.org.
Added a cgroups::memory namespace.

Review: https://reviews.apache.org/r/13903


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ca4580e4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ca4580e4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ca4580e4

Branch: refs/heads/master
Commit: ca4580e44383ce17ccd679f4074ed22130d2bdfd
Parents: ba9d843
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Thu Aug 29 13:41:17 2013 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Tue Sep 3 23:08:01 2013 -0700

----------------------------------------------------------------------
 src/linux/cgroups.cpp          |  66 ++++++++++++++++++++
 src/linux/cgroups.hpp          |  33 ++++++++++
 src/slave/cgroups_isolator.cpp | 119 ++++++++++++++++++------------------
 3 files changed, 157 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/ca4580e4/src/linux/cgroups.cpp
----------------------------------------------------------------------
diff --git a/src/linux/cgroups.cpp b/src/linux/cgroups.cpp
index b97a89c..813dcb3 100644
--- a/src/linux/cgroups.cpp
+++ b/src/linux/cgroups.cpp
@@ -1798,4 +1798,70 @@ Try<hashmap<string, uint64_t> > stat(
   return result;
 }
 
+
+namespace memory {
+
+Try<Bytes> limit_in_bytes(const string& hierarchy, const string& cgroup)
+{
+  Try<string> read = cgroups::read(
+      hierarchy, cgroup, "memory.limit_in_bytes");
+
+  if (read.isError()) {
+    return Error(read.error());
+  }
+
+  return Bytes::parse(strings::trim(read.get()) + "B");
+}
+
+
+Try<Nothing> limit_in_bytes(
+    const string& hierarchy,
+    const string& cgroup,
+    const Bytes& limit)
+{
+  return cgroups::write(
+      hierarchy, cgroup, "memory.limit_in_bytes", stringify(limit.bytes()));
+}
+
+
+Try<Bytes> soft_limit_in_bytes(const string& hierarchy, const string& cgroup)
+{
+  Try<string> read = cgroups::read(
+      hierarchy, cgroup, "memory.soft_limit_in_bytes");
+
+  if (read.isError()) {
+    return Error(read.error());
+  }
+
+  return Bytes::parse(strings::trim(read.get()) + "B");
+}
+
+
+Try<Nothing> soft_limit_in_bytes(
+    const string& hierarchy,
+    const string& cgroup,
+    const Bytes& limit)
+{
+  return cgroups::write(
+      hierarchy,
+      cgroup,
+      "memory.soft_limit_in_bytes",
+      stringify(limit.bytes()));
+}
+
+
+Try<Bytes> usage_in_bytes(const string& hierarchy, const string& cgroup)
+{
+  Try<string> read = cgroups::read(
+      hierarchy, cgroup, "memory.usage_in_bytes");
+
+  if (read.isError()) {
+    return Error(read.error());
+  }
+
+  return Bytes::parse(strings::trim(read.get()) + "B");
+}
+
+} // namespace memory {
+
 } // namespace cgroups {

http://git-wip-us.apache.org/repos/asf/mesos/blob/ca4580e4/src/linux/cgroups.hpp
----------------------------------------------------------------------
diff --git a/src/linux/cgroups.hpp b/src/linux/cgroups.hpp
index 3989712..5ee64d6 100644
--- a/src/linux/cgroups.hpp
+++ b/src/linux/cgroups.hpp
@@ -378,6 +378,39 @@ Try<hashmap<std::string, uint64_t> > stat(
     const std::string& cgroup,
     const std::string& file);
 
+
+// Memory controls.
+namespace memory {
+
+// Returns the memory limit from memory.limit_in_bytes.
+Try<Bytes> limit_in_bytes(
+    const std::string& hierarchy,
+    const std::string& cgroup);
+
+// Sets the memory limit using memory.limit_in_bytes.
+Try<Nothing> limit_in_bytes(
+    const std::string& hierarchy,
+    const std::string& cgroup,
+    const Bytes& limit);
+
+// Returns the soft memory limit from memory.soft_limit_in_bytes.
+Try<Bytes> soft_limit_in_bytes(
+    const std::string& hierarchy,
+    const std::string& cgroup);
+
+// Sets the soft memory limit using memory.soft_limit_in_bytes.
+Try<Nothing> soft_limit_in_bytes(
+    const std::string& hierarchy,
+    const std::string& cgroup,
+    const Bytes& limit);
+
+// Returns the memory usage from memory.usage_in_bytes.
+Try<Bytes> usage_in_bytes(
+    const std::string& hierarchy,
+    const std::string& cgroup);
+
+} // namespace memory {
+
 } // namespace cgroups {
 
 #endif // __CGROUPS_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/ca4580e4/src/slave/cgroups_isolator.cpp
----------------------------------------------------------------------
diff --git a/src/slave/cgroups_isolator.cpp b/src/slave/cgroups_isolator.cpp
index 676768e..d327d65 100644
--- a/src/slave/cgroups_isolator.cpp
+++ b/src/slave/cgroups_isolator.cpp
@@ -86,7 +86,7 @@ const Duration CPU_CFS_PERIOD = Milliseconds(100); // Linux default.
 const Duration MIN_CPU_CFS_QUOTA = Milliseconds(1);
 
 // Memory subsystem constants.
-const size_t MIN_MEMORY_MB = 32 * Megabyte;
+const Bytes MIN_MEMORY = Megabytes(32);
 
 
 // This is an approximate double precision equality check.
@@ -1041,50 +1041,53 @@ Try<Nothing> CgroupsIsolator::memChanged(
     return Error("Expecting resource 'mem' to be a scalar");
   }
 
-  double mem = resource.scalar().value();
-  size_t limitInBytes =
-    std::max((size_t) mem, MIN_MEMORY_MB) * 1024LL * 1024LL;
-
-  // Determine which control to set. If this is the first time we're
-  // setting the limit, use 'memory.limit_in_bytes'. The "first time"
-  // is determined by checking whether or not we've forked a process
-  // in the cgroup yet (i.e., 'info->pid.isSome()'). If this is not
-  // the first time we're setting the limit AND we're decreasing the
-  // limit, use 'memory.soft_limit_in_bytes'. We do this because we
-  // might not be able to decrease 'memory.limit_in_bytes' if too much
-  // memory is being used. This is probably okay if the machine has
-  // available resources; TODO(benh): Introduce a MemoryWatcherProcess
-  // which monitors the discrepancy between usage and soft limit and
-  // introduces a "manual oom" if necessary.
-  string control = "memory.limit_in_bytes";
-
-  if (info->pid.isSome()) {
-    Try<string> read = cgroups::read(
-        hierarchy, info->name(), "memory.limit_in_bytes");
-    if (read.isError()) {
-      return Error(
-          "Failed to read 'memory.limit_in_bytes': " + read.error());
-    }
-
-    Try<size_t> currentLimitInBytes = numify<size_t>(strings::trim(read.get()));
-    CHECK_SOME(currentLimitInBytes);
-
-    if (limitInBytes <= currentLimitInBytes.get()) {
-      control = "memory.soft_limit_in_bytes";
-    }
-  }
+  Bytes mem = Bytes((uint64_t) resource.scalar().value() * 1024LL * 1024LL);
+  Bytes limit = std::max(mem, MIN_MEMORY);
 
-  Try<Nothing> write = cgroups::write(
-      hierarchy, info->name(), control, stringify(limitInBytes));
+  // Always set the soft limit.
+  Try<Nothing> write =
+    cgroups::memory::soft_limit_in_bytes(hierarchy, info->name(), limit);
 
   if (write.isError()) {
-    return Error("Failed to update '" + control + "': " + write.error());
+    return Error("Failed to set 'memory.soft_limit_in_bytes': "
+        + write.error());
   }
 
-  LOG(INFO) << "Updated '" << control << "' to " << limitInBytes
+  LOG(INFO) << "Updated 'memory.soft_limit_in_bytes' to " << limit
             << " for executor " << info->executorId
             << " of framework " << info->frameworkId;
 
+  // Read the existing limit.
+  Try<Bytes> currentLimit =
+    cgroups::memory::limit_in_bytes(hierarchy, info->name());
+
+  if (currentLimit.isError()) {
+    return Error(
+        "Failed to read 'memory.limit_in_bytes': " + currentLimit.error());
+  }
+
+  // Determine whether to set the hard limit. If this is the first
+  // time (info->pid.isNone()), or we're raising the existing limit,
+  // then we can update the hard limit safely. Otherwise, if we need
+  // to decrease 'memory.limit_in_bytes' we may induce an OOM if too
+  // much memory is in use. As a result, we only update the soft
+  // limit when the memory reservation is being reduced. This is
+  // probably okay if the machine has available resources.
+  // TODO(benh): Introduce a MemoryWatcherProcess which monitors the
+  // discrepancy between usage and soft limit and introduces a
+  // "manual oom" if necessary.
+  if (info->pid.isNone() || limit > currentLimit.get()) {
+    write = cgroups::memory::limit_in_bytes(hierarchy, info->name(), limit);
+
+    if (write.isError()) {
+      return Error("Failed to set 'memory.limit_in_bytes': " + write.error());
+    }
+
+    LOG(INFO) << "Updated 'memory.limit_in_bytes' to " << limit
+              << " for executor " << info->executorId
+              << " of framework " << info->frameworkId;
+  }
+
   return Nothing();
 }
 
@@ -1181,35 +1184,29 @@ void CgroupsIsolator::oom(
   ostringstream message;
   message << "Memory limit exceeded: ";
 
-  Try<string> read = cgroups::read(
-      hierarchy, info->name(), "memory.limit_in_bytes");
-  if (read.isSome()) {
-    Try<uint64_t> bytes = numify<uint64_t>(strings::trim(read.get()));
-    if (bytes.isError()) {
-      LOG(ERROR)
-        << "Failed to numify 'memory.limit_in_bytes': " << bytes.error();
-      message << "Requested: " << strings::trim(read.get()) << " bytes ";
-    } else {
-      message << "Requested: " << Bytes(bytes.get()) << " ";
-    }
+  // Output the requested memory limit.
+  Try<Bytes> limit = cgroups::memory::limit_in_bytes(hierarchy, info->name());
+
+  if (limit.isError()) {
+    LOG(ERROR) << "Failed to read 'memory.limit_in_bytes': " << limit.error();
+  } else {
+    message << "Requested: " << limit.get() << " ";
   }
 
-  // Output 'memory.usage_in_bytes'.
-  read = cgroups::read(hierarchy, info->name(), "memory.usage_in_bytes");
-  if (read.isSome()) {
-    Try<uint64_t> bytes = numify<uint64_t>(strings::trim(read.get()));
-    if (bytes.isError()) {
-      LOG(ERROR)
-        << "Failed to numify 'memory.usage_in_bytes': " << bytes.error();
-      message << "Used: " << strings::trim(read.get()) << " bytes\n";
-    } else {
-      message << "Used: " << Bytes(bytes.get()) << "\n";
-    }
+  // Output the memory usage.
+  Try<Bytes> usage = cgroups::memory::usage_in_bytes(hierarchy, info->name());
+
+  if (usage.isError()) {
+    LOG(ERROR) << "Failed to read 'memory.usage_in_bytes': " << usage.error();
+  } else {
+    message << "Used: " << usage.get() << "\n";
   }
 
   // Output 'memory.stat' of the cgroup to help with debugging.
-  read = cgroups::read(hierarchy, info->name(), "memory.stat");
-  if (read.isSome()) {
+  Try<string> read = cgroups::read(hierarchy, info->name(), "memory.stat");
+  if (read.isError()) {
+    LOG(ERROR) << "Failed to read 'memory.stat': " << read.error();
+  } else {
     message << "\nMEMORY STATISTICS: \n" << read.get() << "\n";
   }
 


[2/6] git commit: Fixed a CHECK failure in the master when successive exited events occur for a disconnected slave.

Posted by bm...@apache.org.
Fixed a CHECK failure in the master when successive exited events
occur for a disconnected slave.

Review: https://reviews.apache.org/r/13956


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/dc2ab2ff
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/dc2ab2ff
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/dc2ab2ff

Branch: refs/heads/master
Commit: dc2ab2ff4b868c479701fcf6aa2d43903e32b95c
Parents: 3683ab6
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Tue Sep 3 17:53:03 2013 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Tue Sep 3 22:48:43 2013 -0700

----------------------------------------------------------------------
 src/master/master.cpp | 16 +++++++++++-----
 1 file changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/dc2ab2ff/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 874d7fa..a2ffe7f 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -557,11 +557,7 @@ void Master::exited(const UPID& pid)
                   << "because it is not checkpointing!";
         removeSlave(slave);
         return;
-      } else {
-        CHECK(!slave->disconnected)
-              << "Slave " << slave->id << " ("
-              << slave->info.hostname() << ") already disconnected!" ;
-
+      } else if (!slave->disconnected) {
         // Mark the slave as disconnected and remove it from the allocator.
         slave->disconnected = true;
 
@@ -601,6 +597,10 @@ void Master::exited(const UPID& pid)
           // Remove and rescind offers.
           removeOffer(offer, true); // Rescind!
         }
+      } else {
+        LOG(WARNING) << "Ignoring duplicate exited() notification for "
+                     << "checkpointing slave " << slave->id
+                     << " (" << slave->info.hostname() << ")";
       }
     }
   }
@@ -1143,6 +1143,12 @@ void Master::reregisterSlave(const SlaveID& slaveId,
       reply(message);
 
       // Update the slave pid and relink to it.
+      // NOTE: Re-linking the slave here always rather than only when
+      // the slave is disconnected can lead to multiple exited events
+      // in succession for a disconnected slave. As a result, we
+      // ignore duplicate exited events for disconnected checkpointing
+      // slaves.
+      // See: https://issues.apache.org/jira/browse/MESOS-675
       slave->pid = from;
       link(slave->pid);
 


[6/6] git commit: Exposing cpu.stat statistics in the monitoring endpoint.

Posted by bm...@apache.org.
Exposing cpu.stat statistics in the monitoring endpoint.

From: Christina Delimitrou <ch...@gmail.com>
Review: https://reviews.apache.org/r/13868


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4954b75f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4954b75f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4954b75f

Branch: refs/heads/master
Commit: 4954b75f6fbe2d23fb3e11489441036bd3676017
Parents: 189cefe
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Tue Sep 3 23:12:55 2013 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Sep 4 00:09:09 2013 -0700

----------------------------------------------------------------------
 include/mesos/mesos.proto      |  5 +++++
 src/slave/cgroups_isolator.cpp | 23 +++++++++++++++++++++
 src/slave/constants.cpp        |  2 +-
 src/slave/monitor.cpp          | 41 +++++++++++++++++++++++++++++++++++++
 src/tests/monitor_tests.cpp    | 13 ++++++++++--
 5 files changed, 81 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/4954b75f/include/mesos/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index bbf1d31..8f845cc 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -274,6 +274,11 @@ message ResourceStatistics {
   // Number of CPUs allocated.
   required double cpus_limit = 4;
 
+  // cpu.stat on process throttling (for contention issues).
+  optional uint32 cpus_nr_periods = 7;
+  optional uint32 cpus_nr_throttled = 8;
+  optional double cpus_throttled_time_secs = 9;
+
   // Memory Usage Information:
   optional uint64 mem_rss_bytes = 5; // Resident Set Size.
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/4954b75f/src/slave/cgroups_isolator.cpp
----------------------------------------------------------------------
diff --git a/src/slave/cgroups_isolator.cpp b/src/slave/cgroups_isolator.cpp
index ec9f8ec..a1f5b32 100644
--- a/src/slave/cgroups_isolator.cpp
+++ b/src/slave/cgroups_isolator.cpp
@@ -782,6 +782,29 @@ Future<ResourceStatistics> CgroupsIsolator::usage(
     result.set_mem_rss_bytes(stat.get()["rss"]);
   }
 
+  // Add the cpu.stat information.
+  stat = cgroups::stat(hierarchy, info->name(), "cpu.stat");
+
+  if (stat.isError()) {
+    return Future<ResourceStatistics>::failed(
+        "Failed to read cpu.stat: " + stat.error());
+  }
+
+  if (stat.get().contains("nr_periods")) {
+    result.set_cpus_nr_periods(
+        (uint32_t) stat.get()["nr_periods"]);
+  }
+
+  if (stat.get().contains("nr_throttled")) {
+    result.set_cpus_nr_throttled(
+        (uint32_t) stat.get()["nr_throttled"]);
+  }
+
+  if (stat.get().contains("throttled_time")) {
+    result.set_cpus_throttled_time_secs(
+        Nanoseconds(stat.get()["throttled_time"]).secs());
+  }
+
   return result;
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/4954b75f/src/slave/constants.cpp
----------------------------------------------------------------------
diff --git a/src/slave/constants.cpp b/src/slave/constants.cpp
index 8c74c00..5573d39 100644
--- a/src/slave/constants.cpp
+++ b/src/slave/constants.cpp
@@ -30,7 +30,7 @@ const Duration GC_DELAY = Weeks(1);
 const double GC_DISK_HEADROOM = 0.1;
 const Duration DISK_WATCH_INTERVAL = Minutes(1);
 const Duration RECOVERY_TIMEOUT = Minutes(15);
-const Duration RESOURCE_MONITORING_INTERVAL = Seconds(5);
+const Duration RESOURCE_MONITORING_INTERVAL = Seconds(1);
 const uint32_t MAX_COMPLETED_FRAMEWORKS = 50;
 const uint32_t MAX_COMPLETED_EXECUTORS_PER_FRAMEWORK = 150;
 const uint32_t MAX_COMPLETED_TASKS_PER_EXECUTOR = 200;

http://git-wip-us.apache.org/repos/asf/mesos/blob/4954b75f/src/slave/monitor.cpp
----------------------------------------------------------------------
diff --git a/src/slave/monitor.cpp b/src/slave/monitor.cpp
index 8e1eb35..9cb6256 100644
--- a/src/slave/monitor.cpp
+++ b/src/slave/monitor.cpp
@@ -52,12 +52,16 @@ using process::wait; // Necessary on some OS's to disambiguate.
 // These match the names in the ResourceStatistics protobuf.
 // TODO(bmahler): Later, when we have a richer monitoring story,
 // we will want to publish these outside of this file.
+// TODO(cdel): Check if we need any more of the cgroup stats.
 const std::string CPUS_TIME_SECS        = "cpus_time_secs";
 const std::string CPUS_USER_TIME_SECS   = "cpus_user_time_secs";
 const std::string CPUS_SYSTEM_TIME_SECS = "cpus_system_time_secs";
 const std::string CPUS_LIMIT            = "cpus_limit";
 const std::string MEM_RSS_BYTES         = "mem_rss_bytes";
 const std::string MEM_LIMIT_BYTES       = "mem_limit_bytes";
+const std::string CPUS_NR_PERIODS       = "cpus_nr_periods";
+const std::string CPUS_NR_THROTTLED     = "cpus_nr_throttled";
+const std::string CPUS_THROTTLED_TIME_SECS = "cpus_throttled_time_secs";
 
 // TODO(bmahler): Deprecated statistical names, these will be removed!
 const std::string CPU_TIME   = "cpu_time";
@@ -126,6 +130,9 @@ Future<Nothing> ResourceMonitorProcess::unwatch(
   ::statistics->archive("monitor", prefix + CPUS_LIMIT);
   ::statistics->archive("monitor", prefix + MEM_RSS_BYTES);
   ::statistics->archive("monitor", prefix + MEM_LIMIT_BYTES);
+  ::statistics->archive("monitor", prefix + CPUS_NR_PERIODS);
+  ::statistics->archive("monitor", prefix + CPUS_NR_THROTTLED);
+  ::statistics->archive("monitor", prefix + CPUS_THROTTLED_TIME_SECS);
 
   if (!watches.contains(frameworkId) ||
       !watches[frameworkId].contains(executorId)) {
@@ -248,6 +255,23 @@ void publish(
       prefix + MEM_LIMIT_BYTES,
       statistics.mem_limit_bytes(),
       time);
+
+  // Publish cpu.stat statistics.
+  ::statistics->set(
+      "monitor",
+      prefix + CPUS_NR_PERIODS,
+      statistics.cpus_nr_periods(),
+      time);
+  ::statistics->set(
+      "monitor",
+      prefix + CPUS_NR_THROTTLED,
+      statistics.cpus_nr_throttled(),
+      time);
+  ::statistics->set(
+      "monitor",
+      prefix + CPUS_THROTTLED_TIME_SECS,
+      statistics.cpus_throttled_time_secs(),
+      time);
 }
 
 
@@ -286,6 +310,9 @@ Future<http::Response> _statisticsJSON(
       usage.values[CPUS_LIMIT] = 0;
       usage.values[MEM_RSS_BYTES] = 0;
       usage.values[MEM_LIMIT_BYTES] = 0;
+      usage.values[CPUS_NR_PERIODS] = 0;
+      usage.values[CPUS_NR_THROTTLED] = 0;
+      usage.values[CPUS_THROTTLED_TIME_SECS] = 0;
 
       // Set the cpu usage data if present.
       if (statistics.count(prefix + CPUS_USER_TIME_SECS) > 0) {
@@ -310,6 +337,20 @@ Future<http::Response> _statisticsJSON(
           statistics.find(prefix + MEM_LIMIT_BYTES)->second;
       }
 
+      // Set the cpu.stat data if present.
+      if (statistics.count(prefix + CPUS_NR_PERIODS) > 0) {
+        usage.values[CPUS_NR_PERIODS] =
+          statistics.find(prefix + CPUS_NR_PERIODS)->second;
+      }
+      if (statistics.count(prefix + CPUS_NR_THROTTLED) > 0) {
+        usage.values[CPUS_NR_THROTTLED] =
+          statistics.find(prefix + CPUS_NR_THROTTLED)->second;
+      }
+      if (statistics.count(prefix + CPUS_THROTTLED_TIME_SECS) > 0) {
+        usage.values[CPUS_THROTTLED_TIME_SECS] =
+          statistics.find(prefix + CPUS_THROTTLED_TIME_SECS)->second;
+      }
+
       JSON::Object entry;
       entry.values["framework_id"] = frameworkId.value();
       entry.values["executor_id"] = executorId.value();

http://git-wip-us.apache.org/repos/asf/mesos/blob/4954b75f/src/tests/monitor_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/monitor_tests.cpp b/src/tests/monitor_tests.cpp
index 3142416..3d3f8af 100644
--- a/src/tests/monitor_tests.cpp
+++ b/src/tests/monitor_tests.cpp
@@ -75,15 +75,18 @@ TEST(MonitorTest, WatchUnwatch)
   ResourceStatistics initialStatistics;
   initialStatistics.set_cpus_user_time_secs(0);
   initialStatistics.set_cpus_system_time_secs(0);
-  initialStatistics.set_cpus_limit(1.0);
+  initialStatistics.set_cpus_limit(2.5);
   initialStatistics.set_mem_rss_bytes(0);
   initialStatistics.set_mem_limit_bytes(2048);
   initialStatistics.set_timestamp(Clock::now().secs());
 
   ResourceStatistics statistics;
+  statistics.set_cpus_nr_periods(100);
+  statistics.set_cpus_nr_throttled(2);
   statistics.set_cpus_user_time_secs(4);
   statistics.set_cpus_system_time_secs(1);
-  statistics.set_cpus_limit(1.0);
+  statistics.set_cpus_throttled_time_secs(0.5);
+  statistics.set_cpus_limit(2.5);
   statistics.set_mem_rss_bytes(1024);
   statistics.set_mem_limit_bytes(2048);
   statistics.set_timestamp(
@@ -183,14 +186,20 @@ TEST(MonitorTest, WatchUnwatch)
               "\"source\":\"source\","
               "\"statistics\":{"
                   "\"cpus_limit\":%g,"
+                  "\"cpus_nr_periods\":%d,"
+                  "\"cpus_nr_throttled\":%d,"
                   "\"cpus_system_time_secs\":%g,"
+                  "\"cpus_throttled_time_secs\":%g,"
                   "\"cpus_user_time_secs\":%g,"
                   "\"mem_limit_bytes\":%lu,"
                   "\"mem_rss_bytes\":%lu"
               "}"
           "}]",
           statistics.cpus_limit(),
+          statistics.cpus_nr_periods(),
+          statistics.cpus_nr_throttled(),
           statistics.cpus_system_time_secs(),
+          statistics.cpus_throttled_time_secs(),
           statistics.cpus_user_time_secs(),
           statistics.mem_limit_bytes(),
           statistics.mem_rss_bytes()).get(),


[5/6] git commit: Fixed CgroupsIsolator to listen for OOMs of recovered executors.

Posted by bm...@apache.org.
Fixed CgroupsIsolator to listen for OOMs of recovered executors.

Review: https://reviews.apache.org/r/13904


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/189cefe3
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/189cefe3
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/189cefe3

Branch: refs/heads/master
Commit: 189cefe3a540ee448161773f6b6b88b8e451f151
Parents: ca4580e
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Thu Aug 29 13:56:05 2013 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Tue Sep 3 23:08:01 2013 -0700

----------------------------------------------------------------------
 src/slave/cgroups_isolator.cpp | 18 ++++++++++++------
 1 file changed, 12 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/189cefe3/src/slave/cgroups_isolator.cpp
----------------------------------------------------------------------
diff --git a/src/slave/cgroups_isolator.cpp b/src/slave/cgroups_isolator.cpp
index d327d65..ec9f8ec 100644
--- a/src/slave/cgroups_isolator.cpp
+++ b/src/slave/cgroups_isolator.cpp
@@ -526,7 +526,7 @@ void CgroupsIsolator::launchExecutor(
                << ": " << create.error();
   }
 
-  // Setup the initial resource constrains.
+  // Setup the initial resource constraints.
   resourcesChanged(frameworkId, executorId, resources);
 
   // Start listening on OOM events.
@@ -846,11 +846,7 @@ Future<Nothing> CgroupsIsolator::recover(
                    info->destroyed,
                    info->message);
 
-          // We make a copy here because 'info' will be deleted when
-          // we unregister.
-          unregisterCgroupInfo(
-              utils::copy(info->frameworkId),
-              utils::copy(info->executorId));
+          unregisterCgroupInfo(framework.id, executor.id);
 
           continue;
         }
@@ -865,6 +861,16 @@ Future<Nothing> CgroupsIsolator::recover(
                          run.forkedPid.get(),
                          lambda::_1));
         }
+
+        // Start listening for OOMs. If the executor OOMed while the
+        // slave was down or recovering, the cgroup will already be
+        // under_oom, resulting in immediate notification.
+        // TODO(bmahler): I've been unable to find documentation
+        // guaranteeing this, but the kernel source indicates they
+        // notify if already under_oom.
+        if (subsystems.contains("memory")) {
+          oomListen(framework.id, executor.id);
+        }
       }
     }
   }