You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2015/06/11 20:36:20 UTC

[1/6] mesos git commit: Removed unused constaints from ResourceMonitor.

Repository: mesos
Updated Branches:
  refs/heads/master aba253d61 -> a89ba3f19


Removed unused constaints from ResourceMonitor.

Review: https://reviews.apache.org/r/35259


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/eaa992a4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/eaa992a4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/eaa992a4

Branch: refs/heads/master
Commit: eaa992a47c485561d13116564f2ee6434de59164
Parents: aba253d
Author: Jie Yu <yu...@gmail.com>
Authored: Mon Jun 8 16:28:03 2015 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Thu Jun 11 10:40:32 2015 -0700

----------------------------------------------------------------------
 src/slave/monitor.hpp | 4 ----
 1 file changed, 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/eaa992a4/src/slave/monitor.hpp
----------------------------------------------------------------------
diff --git a/src/slave/monitor.hpp b/src/slave/monitor.hpp
index bee91ba..6a51eee 100644
--- a/src/slave/monitor.hpp
+++ b/src/slave/monitor.hpp
@@ -48,10 +48,6 @@ class Containerizer;
 class ResourceMonitorProcess;
 
 
-const extern Duration MONITORING_TIME_SERIES_WINDOW;
-const extern size_t MONITORING_TIME_SERIES_CAPACITY;
-
-
 // Provides resource monitoring for containers. Usage information is
 // also exported via a JSON endpoint.
 // TODO(bmahler): Forward usage information to the master.


[2/6] mesos git commit: Refactored the ResourceMonitor to get statistics from the Slave.

Posted by ji...@apache.org.
Refactored the ResourceMonitor to get statistics from the Slave.

Review: https://reviews.apache.org/r/35260


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8274c5e9
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8274c5e9
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8274c5e9

Branch: refs/heads/master
Commit: 8274c5e9ef46e297337999760e741d93710832f8
Parents: eaa992a
Author: Jie Yu <yu...@gmail.com>
Authored: Tue Jun 9 11:51:22 2015 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Thu Jun 11 11:31:01 2015 -0700

----------------------------------------------------------------------
 include/mesos/mesos.proto                  |  22 +-
 include/mesos/slave/resource_estimator.hpp |   3 +-
 src/slave/monitor.cpp                      | 296 ++++++++----------------
 src/slave/monitor.hpp                      |  99 +-------
 src/slave/resource_estimators/fixed.cpp    |  16 +-
 src/slave/resource_estimators/noop.cpp     |   6 +-
 src/slave/resource_estimators/noop.hpp     |   3 +-
 src/slave/slave.cpp                        | 121 +++++-----
 src/slave/slave.hpp                        |   3 +
 src/tests/mesos.hpp                        |   4 +-
 src/tests/monitor_tests.cpp                | 137 ++++-------
 11 files changed, 236 insertions(+), 474 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/8274c5e9/include/mesos/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index 7457ff1..2034009 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -620,14 +620,24 @@ message ResourceStatistics {
 
 
 /**
- * Describes a snapshot of the resource usage for an executor.
+ * Describes a snapshot of the resource usage for executors.
  */
 message ResourceUsage {
-  // Source of the collected statistics.
-  optional ExecutorInfo executor_info = 1;
-  // Current resource usage.
-  // If missing, the isolation module cannot provide resource usage.
-  optional ResourceStatistics statistics = 2;
+  message Executor {
+    required ExecutorInfo executor_info = 1;
+
+    // This includes resources used by the executor itself
+    // as well as its active tasks.
+    repeated Resource allocated = 2;
+
+    // Current resource usage. If absent, the containerizer
+    // cannot provide resource usage.
+    optional ResourceStatistics statistics = 3;
+  }
+
+  repeated Executor executors = 1;
+
+  // TODO(jieyu): Include slave's total resources here.
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/8274c5e9/include/mesos/slave/resource_estimator.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/slave/resource_estimator.hpp b/include/mesos/slave/resource_estimator.hpp
index 7f78fd8..731ec3a 100644
--- a/include/mesos/slave/resource_estimator.hpp
+++ b/include/mesos/slave/resource_estimator.hpp
@@ -52,8 +52,7 @@ public:
   // resource estimator to fetch the current resource usage for each
   // executor on slave.
   virtual Try<Nothing> initialize(
-      const lambda::function<
-          process::Future<std::list<ResourceUsage>>()>& usages) = 0;
+      const lambda::function<process::Future<ResourceUsage>()>& usage) = 0;
 
   // Returns the current estimation about the *maximum* amount of
   // resources that can be oversubscribed on the slave. A new

http://git-wip-us.apache.org/repos/asf/mesos/blob/8274c5e9/src/slave/monitor.cpp
----------------------------------------------------------------------
diff --git a/src/slave/monitor.cpp b/src/slave/monitor.cpp
index 8f7ff63..82aa659 100644
--- a/src/slave/monitor.cpp
+++ b/src/slave/monitor.cpp
@@ -16,256 +16,156 @@
  * limitations under the License.
  */
 
-#include <list>
-#include <map>
 #include <string>
 
-#include <mesos/mesos.hpp>
+#include <glog/logging.h>
 
-#include <process/clock.hpp>
 #include <process/collect.hpp>
 #include <process/defer.hpp>
-#include <process/delay.hpp>
 #include <process/future.hpp>
 #include <process/help.hpp>
 #include <process/http.hpp>
+#include <process/limiter.hpp>
 #include <process/process.hpp>
-#include <process/statistics.hpp>
 
 #include <stout/json.hpp>
 #include <stout/lambda.hpp>
 #include <stout/protobuf.hpp>
 
-#include "slave/containerizer/containerizer.hpp"
 #include "slave/monitor.hpp"
 
 using namespace process;
 
-using std::list;
-using std::make_pair;
-using std::map;
 using std::string;
 
 namespace mesos {
 namespace internal {
 namespace slave {
 
-using process::wait; // Necessary on some OS's to disambiguate.
-
-
-Future<Nothing> ResourceMonitorProcess::start(
-    const ContainerID& containerId,
-    const ExecutorInfo& executorInfo)
+static const string STATISTICS_HELP()
 {
-  if (monitored.contains(containerId)) {
-    return Failure("Already monitored");
-  }
-
-  monitored[containerId] = executorInfo;
-
-  return Nothing();
+  return HELP(
+      TLDR(
+          "Retrieve resource monitoring information."),
+      USAGE(
+          "/statistics.json"),
+      DESCRIPTION(
+          "Returns the current resource consumption data for containers",
+          "running under this slave.",
+          "",
+          "Example:",
+          "",
+          "```",
+          "[{",
+          "    \"executor_id\":\"executor\",",
+          "    \"executor_name\":\"name\",",
+          "    \"framework_id\":\"framework\",",
+          "    \"source\":\"source\",",
+          "    \"statistics\":",
+          "    {",
+          "        \"cpus_limit\":8.25,",
+          "        \"cpus_nr_periods\":769021,",
+          "        \"cpus_nr_throttled\":1046,",
+          "        \"cpus_system_time_secs\":34501.45,",
+          "        \"cpus_throttled_time_secs\":352.597023453,",
+          "        \"cpus_user_time_secs\":96348.84,",
+          "        \"mem_anon_bytes\":4845449216,",
+          "        \"mem_file_bytes\":260165632,",
+          "        \"mem_limit_bytes\":7650410496,",
+          "        \"mem_mapped_file_bytes\":7159808,",
+          "        \"mem_rss_bytes\":5105614848,",
+          "        \"timestamp\":1388534400.0",
+          "    }",
+          "}]",
+          "```"));
 }
 
 
-Future<Nothing> ResourceMonitorProcess::stop(
-    const ContainerID& containerId)
+class ResourceMonitorProcess : public Process<ResourceMonitorProcess>
 {
-  if (!monitored.contains(containerId)) {
-    return Failure("Not monitored");
+public:
+  explicit ResourceMonitorProcess(
+      const lambda::function<Future<ResourceUsage>()>& _usage)
+    : ProcessBase("monitor"),
+      usage(_usage),
+      limiter(2, Seconds(1)) {} // 2 permits per second.
+
+  virtual ~ResourceMonitorProcess() {}
+
+protected:
+  virtual void initialize()
+  {
+    route("/statistics.json",
+          STATISTICS_HELP(),
+          &ResourceMonitorProcess::statistics);
   }
 
-  monitored.erase(containerId);
-
-  return Nothing();
-}
-
-
-Future<list<ResourceUsage>> ResourceMonitorProcess::usages()
-{
-  list<Future<ResourceUsage>> futures;
-
-  foreachkey (const ContainerID& containerId, monitored) {
-    futures.push_back(usage(containerId));
+private:
+  // Returns the monitoring statistics. Requests have no parameters.
+  Future<http::Response> statistics(const http::Request& request)
+  {
+    return limiter.acquire()
+      .then(defer(self(), &Self::_statistics, request));
   }
 
-  return await(futures)
-    .then(defer(self(), &ResourceMonitorProcess::_usages, lambda::_1));
-}
-
-
-list<ResourceUsage> ResourceMonitorProcess::_usages(
-    list<Future<ResourceUsage>> futures)
-{
-  list<ResourceUsage> result;
-  foreach(const Future<ResourceUsage>& future, futures) {
-    if (future.isReady()) {
-      result.push_back(future.get());
-    }
+  Future<http::Response> _statistics(const http::Request& request)
+  {
+    return usage()
+      .then(defer(self(), &Self::__statistics, lambda::_1, request));
   }
 
-  return result;
-}
-
-
-Future<ResourceUsage> ResourceMonitorProcess::usage(
-    ContainerID containerId)
-{
-  if (!monitored.contains(containerId)) {
-    return Failure("Not monitored");
-  }
-
-  ExecutorInfo executorInfo = monitored[containerId];
-
-  return containerizer->usage(containerId)
-    .then(defer(
-        self(),
-        &ResourceMonitorProcess::_usage,
-        executorInfo,
-        lambda::_1))
-    .onFailed([containerId, executorInfo](const string& failure) {
-      LOG(WARNING) << "Failed to get resource usage for "
-                   << " container " << containerId
-                   << " for executor " << executorInfo.executor_id()
-                   << " of framework " << executorInfo.framework_id()
-                   << ": " << failure;
-    })
-    .onDiscarded([containerId, executorInfo]() {
-      LOG(WARNING) << "Failed to get resource usage for "
-                   << " container " << containerId
-                   << " for executor " << executorInfo.executor_id()
-                   << " of framework " << executorInfo.framework_id()
-                   << ": future discarded";
-    });
-}
-
-
-ResourceUsage ResourceMonitorProcess::_usage(
-    const ExecutorInfo& executorInfo,
-    const ResourceStatistics& statistics)
-{
-  ResourceUsage usage;
-  usage.mutable_executor_info()->CopyFrom(executorInfo);
-  usage.mutable_statistics()->CopyFrom(statistics);
-
-  return usage;
-}
-
-
-Future<http::Response> ResourceMonitorProcess::statistics(
-    const http::Request& request)
-{
-  return limiter.acquire()
-    .then(defer(self(), &Self::_statistics, request));
-}
-
+  Future<http::Response> __statistics(
+      const Future<ResourceUsage>& future,
+      const http::Request& request)
+  {
+    if (!future.isReady()) {
+      LOG(WARNING) << "Could not collect resource usage: "
+                   << (future.isFailed() ? future.failure() : "discarded");
 
-Future<http::Response> ResourceMonitorProcess::_statistics(
-    const http::Request& request)
-{
-  return usages()
-    .then(defer(self(), &Self::__statistics, lambda::_1, request));
-}
+      return http::InternalServerError();
+    }
 
+    JSON::Array result;
 
-Future<http::Response> ResourceMonitorProcess::__statistics(
-    const Future<list<ResourceUsage>>& futures,
-    const http::Request& request)
-{
-  if (!futures.isReady()) {
-    LOG(WARNING) << "Could not collect usage statistics";
-    return http::InternalServerError();
-  }
+    foreach (const ResourceUsage::Executor& executor,
+             future.get().executors()) {
+      if (executor.has_statistics()) {
+        const ExecutorInfo info = executor.executor_info();
 
-  JSON::Array result;
+        JSON::Object entry;
+        entry.values["framework_id"] = info.framework_id().value();
+        entry.values["executor_id"] = info.executor_id().value();
+        entry.values["executor_name"] = info.name();
+        entry.values["source"] = info.source();
+        entry.values["statistics"] = JSON::Protobuf(executor.statistics());
 
-  foreach (const ResourceUsage& usage, futures.get()) {
-    JSON::Object entry;
-    entry.values["framework_id"] = usage.executor_info().framework_id().value();
-    entry.values["executor_id"] = usage.executor_info().executor_id().value();
-    entry.values["executor_name"] = usage.executor_info().name();
-    entry.values["source"] = usage.executor_info().source();
-    entry.values["statistics"] = JSON::Protobuf(usage.statistics());
+        result.values.push_back(entry);
+      }
+    }
 
-    result.values.push_back(entry);
+    return http::OK(result, request.query.get("jsonp"));
   }
 
-  return http::OK(result, request.query.get("jsonp"));
-}
+  // Callback used to retrieve resource usage information from slave.
+  const lambda::function<Future<ResourceUsage>()> usage;
 
+  // Used to rate limit the statistics.json endpoint.
+  RateLimiter limiter;
+};
 
-const string ResourceMonitorProcess::STATISTICS_HELP = HELP(
-    TLDR(
-        "Retrieve resource monitoring information."),
-    USAGE(
-        "/statistics.json"),
-    DESCRIPTION(
-        "Returns the current resource consumption data for containers",
-        "running under this slave.",
-        "",
-        "Example:",
-        "",
-        "```",
-        "[{",
-        "    \"executor_id\":\"executor\",",
-        "    \"executor_name\":\"name\",",
-        "    \"framework_id\":\"framework\",",
-        "    \"source\":\"source\",",
-        "    \"statistics\":",
-        "    {",
-        "        \"cpus_limit\":8.25,",
-        "        \"cpus_nr_periods\":769021,",
-        "        \"cpus_nr_throttled\":1046,",
-        "        \"cpus_system_time_secs\":34501.45,",
-        "        \"cpus_throttled_time_secs\":352.597023453,",
-        "        \"cpus_user_time_secs\":96348.84,",
-        "        \"mem_anon_bytes\":4845449216,",
-        "        \"mem_file_bytes\":260165632,",
-        "        \"mem_limit_bytes\":7650410496,",
-        "        \"mem_mapped_file_bytes\":7159808,",
-        "        \"mem_rss_bytes\":5105614848,",
-        "        \"timestamp\":1388534400.0",
-        "    }",
-        "}]",
-        "```"));
 
-
-ResourceMonitor::ResourceMonitor(Containerizer* containerizer)
+ResourceMonitor::ResourceMonitor(
+    const lambda::function<Future<ResourceUsage>()>& usage)
+  : process(new ResourceMonitorProcess(usage))
 {
-  process = new ResourceMonitorProcess(containerizer);
-  spawn(process);
+  spawn(process.get());
 }
 
 
 ResourceMonitor::~ResourceMonitor()
 {
-  terminate(process);
-  wait(process);
-  delete process;
-}
-
-
-Future<Nothing> ResourceMonitor::start(
-    const ContainerID& containerId,
-    const ExecutorInfo& executorInfo)
-{
-  return dispatch(
-      process,
-      &ResourceMonitorProcess::start,
-      containerId,
-      executorInfo);
-}
-
-
-Future<Nothing> ResourceMonitor::stop(
-    const ContainerID& containerId)
-{
-  return dispatch(process, &ResourceMonitorProcess::stop, containerId);
-}
-
-
-Future<list<ResourceUsage>> ResourceMonitor::usages()
-{
-  return dispatch(process, &ResourceMonitorProcess::usages);
+  terminate(process.get());
+  wait(process.get());
 }
 
 } // namespace slave {

http://git-wip-us.apache.org/repos/asf/mesos/blob/8274c5e9/src/slave/monitor.hpp
----------------------------------------------------------------------
diff --git a/src/slave/monitor.hpp b/src/slave/monitor.hpp
index 6a51eee..0dff109 100644
--- a/src/slave/monitor.hpp
+++ b/src/slave/monitor.hpp
@@ -19,119 +19,32 @@
 #ifndef __SLAVE_MONITOR_HPP__
 #define __SLAVE_MONITOR_HPP__
 
-#include <map>
-#include <string>
-
-#include <boost/circular_buffer.hpp>
-
 #include <mesos/mesos.hpp>
-#include <mesos/type_utils.hpp>
 
 #include <process/future.hpp>
-#include <process/limiter.hpp>
 #include <process/owned.hpp>
-#include <process/statistics.hpp>
 
-#include <stout/cache.hpp>
-#include <stout/duration.hpp>
-#include <stout/hashmap.hpp>
-#include <stout/nothing.hpp>
-#include <stout/option.hpp>
-#include <stout/try.hpp>
+#include <stout/lambda.hpp>
 
 namespace mesos {
 namespace internal {
 namespace slave {
 
 // Forward declarations.
-class Containerizer;
 class ResourceMonitorProcess;
 
 
-// Provides resource monitoring for containers. Usage information is
-// also exported via a JSON endpoint.
-// TODO(bmahler): Forward usage information to the master.
+// Exposes resources usage information via a JSON endpoint.
 class ResourceMonitor
 {
 public:
-  explicit ResourceMonitor(Containerizer* containerizer);
-  ~ResourceMonitor();
-
-  // Starts monitoring resources for the given container.
-  // Returns a failure if the container is already being watched.
-  process::Future<Nothing> start(
-      const ContainerID& containerId,
-      const ExecutorInfo& executorInfo);
-
-  // Stops monitoring resources for the given container.
-  // Returns a failure if the container is unknown to the monitor.
-  process::Future<Nothing> stop(
-      const ContainerID& containerId);
-
-  process::Future<std::list<ResourceUsage>> usages();
-
-private:
-  ResourceMonitorProcess* process;
-};
-
-
-class ResourceMonitorProcess : public process::Process<ResourceMonitorProcess>
-{
-public:
-  explicit ResourceMonitorProcess(Containerizer* _containerizer)
-    : ProcessBase("monitor"),
-      containerizer(_containerizer),
-      limiter(2, Seconds(1)) {} // 2 permits per second.
+  explicit ResourceMonitor(
+      const lambda::function<process::Future<ResourceUsage>()>& usage);
 
-  virtual ~ResourceMonitorProcess() {}
-
-  process::Future<Nothing> start(
-      const ContainerID& containerId,
-      const ExecutorInfo& executorInfo);
-
-  process::Future<Nothing> stop(
-      const ContainerID& containerId);
-
-  process::Future<std::list<ResourceUsage>> usages();
-
-protected:
-  virtual void initialize()
-  {
-    route("/statistics.json",
-          STATISTICS_HELP,
-          &ResourceMonitorProcess::statistics);
-  }
+  ~ResourceMonitor();
 
 private:
-  // Helper for returning the usage for a particular executor.
-  process::Future<ResourceUsage> usage(ContainerID containerId);
-
-  ResourceUsage _usage(
-    const ExecutorInfo& executorInfo,
-    const ResourceStatistics& statistics);
-
-  std::list<ResourceUsage> _usages(
-      std::list<process::Future<ResourceUsage>> future);
-
-  // HTTP Endpoints.
-  // Returns the monitoring statistics. Requests have no parameters.
-  process::Future<process::http::Response> statistics(
-      const process::http::Request& request);
-  process::Future<process::http::Response> _statistics(
-      const process::http::Request& request);
-  process::Future<process::http::Response> __statistics(
-      const process::Future<std::list<ResourceUsage>>& futures,
-      const process::http::Request& request);
-
-  static const std::string STATISTICS_HELP;
-
-  Containerizer* containerizer;
-
-  // Used to rate limit the statistics.json endpoint.
-  process::RateLimiter limiter;
-
-  // The executor info is stored for each monitored container.
-  hashmap<ContainerID, ExecutorInfo> monitored;
+  process::Owned<ResourceMonitorProcess> process;
 };
 
 } // namespace slave {

http://git-wip-us.apache.org/repos/asf/mesos/blob/8274c5e9/src/slave/resource_estimators/fixed.cpp
----------------------------------------------------------------------
diff --git a/src/slave/resource_estimators/fixed.cpp b/src/slave/resource_estimators/fixed.cpp
index 3efa18d..08a712f 100644
--- a/src/slave/resource_estimators/fixed.cpp
+++ b/src/slave/resource_estimators/fixed.cpp
@@ -16,10 +16,6 @@
  * limitations under the License.
  */
 
-#include <list>
-
-#include <mesos/resources.hpp>
-
 #include <mesos/module/resource_estimator.hpp>
 
 #include <mesos/slave/resource_estimator.hpp>
@@ -37,17 +33,15 @@ using mesos::modules::Module;
 
 using mesos::slave::ResourceEstimator;
 
-using std::list;
-
 
 class FixedResourceEstimatorProcess
   : public Process<FixedResourceEstimatorProcess>
 {
 public:
   FixedResourceEstimatorProcess(
-      const lambda::function<Future<list<ResourceUsage>>()>& _usages,
+      const lambda::function<Future<ResourceUsage>()>& _usage,
       const Resources& _resources)
-    : usages(_usages),
+    : usage(_usage),
       resources(_resources) {}
 
   Future<Resources> oversubscribable()
@@ -57,7 +51,7 @@ public:
   }
 
 protected:
-  const lambda::function<Future<list<ResourceUsage>>()>& usages;
+  const lambda::function<Future<ResourceUsage>()>& usage;
   const Resources resources;
 };
 
@@ -83,13 +77,13 @@ public:
   }
 
   virtual Try<Nothing> initialize(
-      const lambda::function<Future<list<ResourceUsage>>()>& usages)
+      const lambda::function<Future<ResourceUsage>()>& usage)
   {
     if (process.get() != NULL) {
       return Error("Fixed resource estimator has already been initialized");
     }
 
-    process.reset(new FixedResourceEstimatorProcess(usages, resources));
+    process.reset(new FixedResourceEstimatorProcess(usage, resources));
     spawn(process.get());
 
     return Nothing();

http://git-wip-us.apache.org/repos/asf/mesos/blob/8274c5e9/src/slave/resource_estimators/noop.cpp
----------------------------------------------------------------------
diff --git a/src/slave/resource_estimators/noop.cpp b/src/slave/resource_estimators/noop.cpp
index 5f135ff..e450b44 100644
--- a/src/slave/resource_estimators/noop.cpp
+++ b/src/slave/resource_estimators/noop.cpp
@@ -16,8 +16,6 @@
  * limitations under the License.
  */
 
-#include <list>
-
 #include <process/dispatch.hpp>
 #include <process/process.hpp>
 
@@ -27,8 +25,6 @@
 
 using namespace process;
 
-using std::list;
-
 namespace mesos {
 namespace internal {
 namespace slave {
@@ -54,7 +50,7 @@ NoopResourceEstimator::~NoopResourceEstimator()
 
 
 Try<Nothing> NoopResourceEstimator::initialize(
-    const lambda::function<Future<list<ResourceUsage>>()>& usages)
+    const lambda::function<Future<ResourceUsage>()>& usage)
 {
   if (process.get() != NULL) {
     return Error("Noop resource estimator has already been initialized");

http://git-wip-us.apache.org/repos/asf/mesos/blob/8274c5e9/src/slave/resource_estimators/noop.hpp
----------------------------------------------------------------------
diff --git a/src/slave/resource_estimators/noop.hpp b/src/slave/resource_estimators/noop.hpp
index 7a44e6d..46f64c7 100644
--- a/src/slave/resource_estimators/noop.hpp
+++ b/src/slave/resource_estimators/noop.hpp
@@ -42,8 +42,7 @@ public:
   virtual ~NoopResourceEstimator();
 
   virtual Try<Nothing> initialize(
-      const lambda::function<
-          process::Future<std::list<ResourceUsage>>()>& usages);
+      const lambda::function<process::Future<ResourceUsage>()>& usage);
 
   virtual process::Future<Resources> oversubscribable();
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/8274c5e9/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 0a2cd16..5f00e6e 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -36,6 +36,7 @@
 
 #include <process/async.hpp>
 #include <process/check.hpp>
+#include <process/collect.hpp>
 #include <process/defer.hpp>
 #include <process/delay.hpp>
 #include <process/dispatch.hpp>
@@ -130,7 +131,7 @@ Slave::Slave(const slave::Flags& _flags,
     files(_files),
     metrics(*this),
     gc(_gc),
-    monitor(containerizer),
+    monitor(defer(self(), &Self::usage)),
     statusUpdateManager(_statusUpdateManager),
     metaDir(paths::getMetaRootDir(flags.work_dir)),
     recoveryErrors(0),
@@ -327,7 +328,7 @@ void Slave::initialize()
   }
 
   Try<Nothing> initialize = resourceEstimator->initialize(
-      lambda::bind(&ResourceMonitor::usages, &monitor));
+      defer(self(), &Self::usage));
 
   if (initialize.isError()) {
     EXIT(1) << "Failed to initialize the resource estimator: "
@@ -2327,21 +2328,6 @@ void Slave::registerExecutor(
 }
 
 
-void _monitor(
-    const Future<Nothing>& monitor,
-    const FrameworkID& frameworkId,
-    const ExecutorID& executorId,
-    const ContainerID& containerId)
-{
-  if (!monitor.isReady()) {
-    LOG(ERROR) << "Failed to monitor container '" << containerId
-               << "' for executor '" << executorId
-               << "' of framework '" << frameworkId
-               << ":" << (monitor.isFailed() ? monitor.failure() : "discarded");
-  }
-}
-
-
 void Slave::reregisterExecutor(
     const UPID& from,
     const FrameworkID& frameworkId,
@@ -2428,18 +2414,6 @@ void Slave::reregisterExecutor(
             executorId,
             executor->containerId));
 
-      // Monitor the executor.
-      // TODO(jieyu): Do not start the monitor if the containerizer
-      // update fails.
-      monitor.start(
-          executor->containerId,
-          executor->info)
-        .onAny(lambda::bind(_monitor,
-                            lambda::_1,
-                            framework->id(),
-                            executor->id,
-                            executor->containerId));
-
       hashmap<TaskID, TaskInfo> unackedTasks;
       foreach (const TaskInfo& task, tasks) {
         unackedTasks[task.task_id()] = task;
@@ -3208,18 +3182,6 @@ void Slave::executorLaunched(
       break;
     case Executor::REGISTERING:
     case Executor::RUNNING:
-      LOG(INFO) << "Monitoring executor '" << executorId
-                << "' of framework '" << frameworkId
-                << "' in container '" << containerId << "'";
-      // Start monitoring the container's resources.
-      monitor.start(
-          containerId,
-          executor->info)
-        .onAny(lambda::bind(_monitor,
-                            lambda::_1,
-                            frameworkId,
-                            executorId,
-                            containerId));
       break;
     case Executor::TERMINATED:
     default:
@@ -3231,12 +3193,6 @@ void Slave::executorLaunched(
 }
 
 
-void _unmonitor(
-    const Future<Nothing>& watch,
-    const FrameworkID& frameworkId,
-    const ExecutorID& executorId);
-
-
 // Called by the isolator when an executor process terminates.
 void Slave::executorTerminated(
     const FrameworkID& frameworkId,
@@ -3298,10 +3254,6 @@ void Slave::executorTerminated(
 
       executor->state = Executor::TERMINATED;
 
-      // Stop monitoring the executor's container.
-      monitor.stop(executor->containerId)
-        .onAny(lambda::bind(_unmonitor, lambda::_1, frameworkId, executorId));
-
       // Transition all live tasks to TASK_LOST/TASK_FAILED.
       // If the containerizer killed the executor (e.g., due to OOM event)
       // or if this is a command executor, we send TASK_FAILED status updates
@@ -3506,19 +3458,6 @@ void Slave::removeFramework(Framework* framework)
 }
 
 
-void _unmonitor(
-    const Future<Nothing>& unmonitor,
-    const FrameworkID& frameworkId,
-    const ExecutorID& executorId)
-{
-  if (!unmonitor.isReady()) {
-    LOG(ERROR) << "Failed to unmonitor container for executor " << executorId
-               << " of framework " << frameworkId << ": "
-               << (unmonitor.isFailed() ? unmonitor.failure() : "discarded");
-  }
-}
-
-
 void Slave::shutdownExecutor(
     const UPID& from,
     const FrameworkID& frameworkId,
@@ -4196,6 +4135,54 @@ void Slave::qosCorrections(
 }
 
 
+Future<ResourceUsage> Slave::usage()
+{
+  // NOTE: We use 'Owned' here trying to avoid the expensive copy.
+  // C++11 lambda only supports capturing variables that have copy
+  // constructors. Revisit once we remove the copy constructor for
+  // Owned (or C++14 lambda generalized capture is supported).
+  Owned<ResourceUsage> usage(new ResourceUsage());
+  list<Future<ResourceStatistics>> futures;
+
+  foreachvalue (const Framework* framework, frameworks) {
+    foreachvalue (const Executor* executor, framework->executors) {
+      ResourceUsage::Executor* entry = usage->add_executors();
+      entry->mutable_executor_info()->CopyFrom(executor->info);
+      entry->mutable_allocated()->CopyFrom(executor->resources);
+
+      futures.push_back(containerizer->usage(executor->containerId));
+    }
+  }
+
+  return await(futures).then(
+      [usage](const list<Future<ResourceStatistics>>& futures)
+        -> Future<ResourceUsage> {
+        // NOTE: We add ResourceUsage::Executor to 'usage' the same
+        // order as we push future to 'futures'. So the variables
+        // 'future' and 'executor' below should be in sync.
+        CHECK_EQ(futures.size(), usage->executors_size());
+
+        size_t i = 0;
+        foreach (const Future<ResourceStatistics>& future, futures) {
+          ResourceUsage::Executor* executor = usage->mutable_executors(i++);
+
+          if (future.isReady()) {
+            executor->mutable_statistics()->CopyFrom(future.get());
+          } else {
+            LOG(WARNING) << "Failed to get resource statistics for executor '"
+                         << executor->executor_info().executor_id() << "'"
+                         << " of framework "
+                         << executor->executor_info().framework_id() << ": "
+                         << (future.isFailed() ? future.failure()
+                                               : "discarded");
+          }
+        }
+
+        return *usage;
+      });
+}
+
+
 // TODO(dhamon): Move these to their own metrics.hpp|cpp.
 double Slave::_tasks_staging()
 {
@@ -4465,9 +4452,9 @@ Executor* Framework::launchExecutor(
     const TaskInfo& taskInfo)
 {
   // Generate an ID for the executor's container.
-  // TODO(idownes) This should be done by the containerizer but we need the
-  // ContainerID to create the executor's directory and to set up monitoring.
-  // Fix this when 'launchExecutor()' is handled asynchronously.
+  // TODO(idownes) This should be done by the containerizer but we
+  // need the ContainerID to create the executor's directory. Fix
+  // this when 'launchExecutor()' is handled asynchronously.
   ContainerID containerId;
   containerId.set_value(UUID::random().toString());
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/8274c5e9/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 98c64f6..0df1b55 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -358,6 +358,9 @@ public:
       const process::Future<std::list<
           mesos::slave::QoSCorrection>>& correction);
 
+  // Returns the resource usage information for all executors.
+  process::Future<ResourceUsage> usage();
+
 private:
   void _authenticate();
   void authenticationTimeout(process::Future<bool> future);

http://git-wip-us.apache.org/repos/asf/mesos/blob/8274c5e9/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 50b0061..e19ef98 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -738,9 +738,7 @@ public:
 
   MOCK_METHOD1(
       initialize,
-      Try<Nothing>(
-          const lambda::function<
-              process::Future<std::list<ResourceUsage>>()>&));
+      Try<Nothing>(const lambda::function<process::Future<ResourceUsage>()>&));
 
   MOCK_METHOD0(
       oversubscribable,

http://git-wip-us.apache.org/repos/asf/mesos/blob/8274c5e9/src/tests/monitor_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/monitor_tests.cpp b/src/tests/monitor_tests.cpp
index 6de8b1f..197c153 100644
--- a/src/tests/monitor_tests.cpp
+++ b/src/tests/monitor_tests.cpp
@@ -17,13 +17,10 @@
  */
 
 #include <limits>
-#include <map>
-
-#include <gmock/gmock.h>
 
 #include <mesos/mesos.hpp>
+#include <mesos/resources.hpp>
 
-#include <process/clock.hpp>
 #include <process/future.hpp>
 #include <process/gmock.hpp>
 #include <process/gtest.hpp>
@@ -33,32 +30,19 @@
 
 #include <stout/nothing.hpp>
 
-#include "slave/constants.hpp"
 #include "slave/monitor.hpp"
 
-#include "tests/containerizer.hpp"
-#include "tests/mesos.hpp"
-
-using process::Clock;
-using process::Future;
+using namespace process;
 
-using process::http::BadRequest;
-using process::http::NotFound;
-using process::http::OK;
-using process::http::Response;
+using mesos::internal::slave::ResourceMonitor;
 
 using std::numeric_limits;
 using std::string;
 
-using testing::_;
-using testing::DoAll;
-using testing::Return;
-
 namespace mesos {
 namespace internal {
 namespace tests {
 
-
 TEST(MonitorTest, Statistics)
 {
   FrameworkID frameworkId;
@@ -67,9 +51,6 @@ TEST(MonitorTest, Statistics)
   ExecutorID executorId;
   executorId.set_value("executor");
 
-  ContainerID containerId;
-  containerId.set_value("container");
-
   ExecutorInfo executorInfo;
   executorInfo.mutable_executor_id()->CopyFrom(executorId);
   executorInfo.mutable_framework_id()->CopyFrom(frameworkId);
@@ -90,37 +71,24 @@ TEST(MonitorTest, Statistics)
   statistics.set_mem_limit_bytes(2048);
   statistics.set_timestamp(0);
 
-  TestContainerizer containerizer;
-
-  Future<Nothing> usage;
-  EXPECT_CALL(containerizer, usage(containerId))
-    .WillOnce(DoAll(FutureSatisfy(&usage),
-                    Return(statistics)));
-
-  slave::ResourceMonitor monitor(&containerizer);
-
-  // We pause the clock first to ensure unexpected collections
-  // are avoided.
-  process::Clock::pause();
+  ResourceMonitor monitor([=]() -> Future<ResourceUsage> {
+    Resources resources = Resources::parse("cpus:1;mem:2").get();
 
-  monitor.start(
-      containerId,
-      executorInfo);
+    ResourceUsage usage;
+    ResourceUsage::Executor* executor = usage.add_executors();
+    executor->mutable_executor_info()->CopyFrom(executorInfo);
+    executor->mutable_allocated()->CopyFrom(resources);
+    executor->mutable_statistics()->CopyFrom(statistics);
 
-  // Now wait for ResouorceMonitorProcess::watch to finish.
-  process::Clock::settle();
+    return usage;
+  });
 
-  process::UPID upid("monitor", process::address());
-
-  // Request the statistics, this will ask the isolator.
-  Future<Response> response = process::http::get(upid, "statistics.json");
+  UPID upid("monitor", address());
 
+  Future<http::Response> response = http::get(upid, "statistics.json");
   AWAIT_READY(response);
 
-  // The collection should have occurred on the isolator.
-  ASSERT_TRUE(usage.isReady());
-
-  AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
   AWAIT_EXPECT_RESPONSE_HEADER_EQ(
       "application/json",
       "Content-Type",
@@ -164,24 +132,23 @@ TEST(MonitorTest, Statistics)
           statistics.mem_rss_bytes(),
           statistics.timestamp()).get(),
       response);
+}
 
-  // Ensure the monitor stops polling the isolator.
-  monitor.stop(containerId);
-
-  // Wait until ResourceMonitorProcess::stop has completed.
-  process::Clock::settle();
 
-  // This time, Containerizer::usage should not get called.
-  EXPECT_CALL(containerizer, usage(containerId))
-    .Times(0);
+// This test verifies the correct handling of the statistics.json
+// endpoint when there is no executor running.
+TEST(MonitorTest, NoExecutor)
+{
+  ResourceMonitor monitor([]() -> Future<ResourceUsage> {
+    return ResourceUsage();
+  });
 
-  response = process::http::get(upid, "statistics.json");
+  UPID upid("monitor", address());
 
-  // Ensure the rate limiter acquires its permit.
-  process::Clock::advance(slave::RESOURCE_MONITORING_INTERVAL);
-  process::Clock::settle();
+  Future<http::Response> response = http::get(upid, "statistics.json");
+  AWAIT_READY(response);
 
-  AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
   AWAIT_EXPECT_RESPONSE_HEADER_EQ(
       "application/json",
       "Content-Type",
@@ -190,43 +157,39 @@ TEST(MonitorTest, Statistics)
 }
 
 
-// Test for correct handling of the statistics.json endpoint when
-// monitoring of a container is stopped.
-TEST(MonitorTest, UsageFailure)
+// This test verifies the correct handling of the statistics.json
+// endpoint when statistics is missing in ResourceUsage.
+TEST(MonitorTest, MissingStatistics)
 {
-  TestContainerizer containerizer;
-
-  // Test containerizer is set up to:
-  // 1) Synchronize test with Containerizer::usage()
-  // 2) After that, stop monitoring the container.
-  Future<Nothing> usage;
-  process::Promise<ResourceStatistics> failPromise;
-  EXPECT_CALL(containerizer, usage(DEFAULT_CONTAINER_ID))
-    .WillOnce(DoAll(FutureSatisfy(&usage),
-                    Return(failPromise.future())));
+  ResourceMonitor monitor([]() -> Future<ResourceUsage> {
+    FrameworkID frameworkId;
+    frameworkId.set_value("framework");
 
-  slave::ResourceMonitor monitor(&containerizer);
+    ExecutorID executorId;
+    executorId.set_value("executor");
 
-  AWAIT_READY(monitor.start(DEFAULT_CONTAINER_ID, DEFAULT_EXECUTOR_INFO));
+    ExecutorInfo executorInfo;
+    executorInfo.mutable_executor_id()->CopyFrom(executorId);
+    executorInfo.mutable_framework_id()->CopyFrom(frameworkId);
+    executorInfo.set_name("name");
+    executorInfo.set_source("source");
 
-  // Induce a call to usage().
-  process::UPID upid("monitor", process::address());
-  Future<Response> response = process::http::get(upid, "statistics.json");
+    Resources resources = Resources::parse("cpus:1;mem:2").get();
 
-  // Usage was called, but Future<ResourceStatistics> is still
-  // unsatisfied and monitor is blocked.
-  AWAIT_READY(usage);
+    ResourceUsage usage;
+    ResourceUsage::Executor* executor = usage.add_executors();
+    executor->mutable_executor_info()->CopyFrom(executorInfo);
+    executor->mutable_allocated()->CopyFrom(resources);
 
-  // Stop monitoring the container.
-  AWAIT_READY(monitor.stop(DEFAULT_CONTAINER_ID));
+    return usage;
+  });
 
-  // Fail the future to the collected container statistic.
-  failPromise.set(process::Failure("Injected failure"));
+  UPID upid("monitor", address());
 
-  // Verify an empty response.
+  Future<http::Response> response = http::get(upid, "statistics.json");
   AWAIT_READY(response);
 
-  AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
   AWAIT_EXPECT_RESPONSE_HEADER_EQ(
       "application/json",
       "Content-Type",


[5/6] mesos git commit: Fixed a bug in the fixed resource estimator.

Posted by ji...@apache.org.
Fixed a bug in the fixed resource estimator.

Review: https://reviews.apache.org/r/35320


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9f0bf57b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9f0bf57b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9f0bf57b

Branch: refs/heads/master
Commit: 9f0bf57b5901601e16f92d04d004f8e25e3f085f
Parents: a97434b
Author: Jie Yu <yu...@gmail.com>
Authored: Wed Jun 10 12:42:58 2015 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Thu Jun 11 11:31:06 2015 -0700

----------------------------------------------------------------------
 src/slave/resource_estimators/fixed.cpp | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/9f0bf57b/src/slave/resource_estimators/fixed.cpp
----------------------------------------------------------------------
diff --git a/src/slave/resource_estimators/fixed.cpp b/src/slave/resource_estimators/fixed.cpp
index 08a712f..ca16137 100644
--- a/src/slave/resource_estimators/fixed.cpp
+++ b/src/slave/resource_estimators/fixed.cpp
@@ -51,7 +51,7 @@ public:
   }
 
 protected:
-  const lambda::function<Future<ResourceUsage>()>& usage;
+  const lambda::function<Future<ResourceUsage>()> usage;
   const Resources resources;
 };
 


[6/6] mesos git commit: Implemented the fixed resource estimator.

Posted by ji...@apache.org.
Implemented the fixed resource estimator.

Review: https://reviews.apache.org/r/35321


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a89ba3f1
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a89ba3f1
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a89ba3f1

Branch: refs/heads/master
Commit: a89ba3f1934e02e585ddcbe15029c41baefc5f23
Parents: 9f0bf57
Author: Jie Yu <yu...@gmail.com>
Authored: Wed Jun 10 12:47:30 2015 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Thu Jun 11 11:31:06 2015 -0700

----------------------------------------------------------------------
 src/slave/resource_estimators/fixed.cpp | 30 ++++++++-----
 src/tests/oversubscription_tests.cpp    | 65 +++++++++++++++++++++++++++-
 2 files changed, 84 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a89ba3f1/src/slave/resource_estimators/fixed.cpp
----------------------------------------------------------------------
diff --git a/src/slave/resource_estimators/fixed.cpp b/src/slave/resource_estimators/fixed.cpp
index ca16137..305664c 100644
--- a/src/slave/resource_estimators/fixed.cpp
+++ b/src/slave/resource_estimators/fixed.cpp
@@ -20,6 +20,7 @@
 
 #include <mesos/slave/resource_estimator.hpp>
 
+#include <process/defer.hpp>
 #include <process/dispatch.hpp>
 #include <process/owned.hpp>
 #include <process/process.hpp>
@@ -40,30 +41,39 @@ class FixedResourceEstimatorProcess
 public:
   FixedResourceEstimatorProcess(
       const lambda::function<Future<ResourceUsage>()>& _usage,
-      const Resources& _resources)
+      const Resources& _totalRevocable)
     : usage(_usage),
-      resources(_resources) {}
+      totalRevocable(_totalRevocable) {}
 
   Future<Resources> oversubscribable()
   {
-    // TODO(jieyu): This is a stub implementation.
-    return resources;
+    return usage().then(defer(self(), &Self::_oversubscribable, lambda::_1));
+  }
+
+  Future<Resources> _oversubscribable(const ResourceUsage& usage)
+  {
+    Resources allocatedRevocable;
+    foreach (const ResourceUsage::Executor& executor, usage.executors()) {
+      allocatedRevocable += Resources(executor.allocated()).revocable();
+    }
+
+    return totalRevocable - allocatedRevocable;
   }
 
 protected:
   const lambda::function<Future<ResourceUsage>()> usage;
-  const Resources resources;
+  const Resources totalRevocable;
 };
 
 
 class FixedResourceEstimator : public ResourceEstimator
 {
 public:
-  FixedResourceEstimator(const Resources& _resources)
-    : resources(_resources)
+  FixedResourceEstimator(const Resources& _totalRevocable)
+    : totalRevocable(_totalRevocable)
   {
     // Mark all resources as revocable.
-    foreach (Resource& resource, resources) {
+    foreach (Resource& resource, totalRevocable) {
       resource.mutable_revocable();
     }
   }
@@ -83,7 +93,7 @@ public:
       return Error("Fixed resource estimator has already been initialized");
     }
 
-    process.reset(new FixedResourceEstimatorProcess(usage, resources));
+    process.reset(new FixedResourceEstimatorProcess(usage, totalRevocable));
     spawn(process.get());
 
     return Nothing();
@@ -101,7 +111,7 @@ public:
   }
 
 private:
-  Resources resources;
+  Resources totalRevocable;
   Owned<FixedResourceEstimatorProcess> process;
 };
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/a89ba3f1/src/tests/oversubscription_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/oversubscription_tests.cpp b/src/tests/oversubscription_tests.cpp
index 58a20d4..e8ae053 100644
--- a/src/tests/oversubscription_tests.cpp
+++ b/src/tests/oversubscription_tests.cpp
@@ -425,9 +425,72 @@ TEST_F(OversubscriptionTest, FixedResourceEstimator)
   AWAIT_READY(update);
 
   Resources resources = update.get().oversubscribed_resources();
-
   EXPECT_SOME_EQ(2.0, resources.cpus());
 
+  Clock::resume();
+
+  // Launch a task that uses revocable resources and verify that the
+  // total oversubscribed resources does not change.
+
+  // We don't expect to receive an UpdateSlaveMessage because the
+  // total oversubscribed resources does not change.
+  EXPECT_NO_FUTURE_PROTOBUFS(UpdateSlaveMessage(), _, _);
+
+  // Start the framework which desires revocable resources.
+  FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO;
+  framework.add_capabilities()->set_type(
+      FrameworkInfo::Capability::REVOCABLE_RESOURCES);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, framework, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(_, _, _));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+  const Offer offer = offers.get()[0];
+
+  // The offer should contain revocable resources.
+  ASSERT_SOME_EQ(2.0, Resources(offer.resources()).revocable().cpus());
+
+  // Now, launch a task that uses revocable resources.
+  Resources taskResources = createRevocableResources("cpus", "1");
+  taskResources += Resources::parse("mem:32").get();
+
+  TaskInfo task = createTask(
+      offer.slave_id(),
+      taskResources,
+      "sleep 1000");
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  driver.launchTasks(offer.id(), {task});
+
+  AWAIT_READY(status);
+  EXPECT_EQ(task.task_id(), status.get().task_id());
+  EXPECT_EQ(TASK_RUNNING, status.get().state());
+
+  // Advance the clock for the slave to trigger the calculation of the
+  // total oversubscribed resources. As we described above, we don't
+  // expect a new UpdateSlaveMessage being generated.
+  Clock::pause();
+  Clock::advance(flags.oversubscribed_resources_interval);
+  Clock::settle();
+  Clock::resume();
+
+  driver.stop();
+  driver.join();
+
   Shutdown();
 }
 


[4/6] mesos git commit: Added a SlaveTest to caputure the case when containerizer usage fails.

Posted by ji...@apache.org.
Added a SlaveTest to caputure the case when containerizer usage fails.

Review: https://reviews.apache.org/r/35305


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a97434bd
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a97434bd
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a97434bd

Branch: refs/heads/master
Commit: a97434bde5dd1722c0fe2985c370b67f3eb5b049
Parents: b3a006e
Author: Jie Yu <yu...@gmail.com>
Authored: Wed Jun 10 11:17:53 2015 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Thu Jun 11 11:31:06 2015 -0700

----------------------------------------------------------------------
 src/tests/slave_tests.cpp | 75 ++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 75 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a97434bd/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index acae497..70fc42d 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -1760,6 +1760,81 @@ TEST_F(SlaveTest, ReregisterWithStatusUpdateTaskState)
 }
 
 
+// This test verifies that the slave should properly handle the case
+// where the containerizer usage call fails when getting the usage
+// information.
+TEST_F(SlaveTest, ContainerizerUsageFailure)
+{
+  Try<PID<Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+  TestContainerizer containerizer(&exec);
+  StandaloneMasterDetector detector(master.get());
+
+  MockSlave slave(CreateSlaveFlags(), &detector, &containerizer);
+  spawn(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(_, _, _));
+  EXPECT_CALL(exec, registered(_, _, _, _));
+
+  Future<vector<Offer>> offers;
+
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  const Offer& offer = offers.get()[0];
+
+  TaskInfo task = createTask(
+      offer.slave_id(),
+      Resources::parse("cpus:0.1;mem:32").get(),
+      "sleep 1000",
+      exec.id);
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  driver.launchTasks(offer.id(), {task});
+
+  AWAIT_READY(status);
+  EXPECT_EQ(TASK_RUNNING, status.get().state());
+
+  // Set up the containerizer so the next usage() will fail.
+  EXPECT_CALL(containerizer, usage(_))
+    .WillOnce(Return(Failure("Injected failure")));
+
+  // We expect that the slave will still returns ResourceUsage but no
+  // statistics will be found.
+  Future<ResourceUsage> usage = slave.usage();
+
+  AWAIT_READY(usage);
+  ASSERT_EQ(1u, usage.get().executors_size());
+  EXPECT_FALSE(usage.get().executors(0).has_statistics());
+
+  driver.stop();
+  driver.join();
+
+  terminate(slave);
+  wait(slave);
+
+  Shutdown();
+}
+
+
 // This test verifies that label values can be set for tasks and that
 // they are exposed over the slave state endpoint.
 TEST_F(SlaveTest, TaskLabels)


[3/6] mesos git commit: Added a slave integration test in MonitorTest.

Posted by ji...@apache.org.
Added a slave integration test in MonitorTest.

Review: https://reviews.apache.org/r/35264


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b3a006ed
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b3a006ed
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b3a006ed

Branch: refs/heads/master
Commit: b3a006ed5bb3ebd3841558cf0316f0cbac9dd18c
Parents: 8274c5e
Author: Jie Yu <yu...@gmail.com>
Authored: Tue Jun 9 12:49:51 2015 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Thu Jun 11 11:31:05 2015 -0700

----------------------------------------------------------------------
 src/tests/monitor_tests.cpp | 104 +++++++++++++++++++++++++++++++++++++--
 1 file changed, 101 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/b3a006ed/src/tests/monitor_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/monitor_tests.cpp b/src/tests/monitor_tests.cpp
index 197c153..53fb53e 100644
--- a/src/tests/monitor_tests.cpp
+++ b/src/tests/monitor_tests.cpp
@@ -17,6 +17,9 @@
  */
 
 #include <limits>
+#include <vector>
+
+#include <gmock/gmock.h>
 
 #include <mesos/mesos.hpp>
 #include <mesos/resources.hpp>
@@ -28,16 +31,25 @@
 #include <process/pid.hpp>
 #include <process/process.hpp>
 
+#include <stout/bytes.hpp>
+#include <stout/json.hpp>
 #include <stout/nothing.hpp>
 
+#include "slave/constants.hpp"
 #include "slave/monitor.hpp"
 
+#include "tests/mesos.hpp"
+
 using namespace process;
 
+using mesos::internal::master::Master;
+
 using mesos::internal::slave::ResourceMonitor;
+using mesos::internal::slave::Slave;
 
 using std::numeric_limits;
 using std::string;
+using std::vector;
 
 namespace mesos {
 namespace internal {
@@ -83,7 +95,7 @@ TEST(MonitorTest, Statistics)
     return usage;
   });
 
-  UPID upid("monitor", address());
+  UPID upid("monitor", process::address());
 
   Future<http::Response> response = http::get(upid, "statistics.json");
   AWAIT_READY(response);
@@ -143,7 +155,7 @@ TEST(MonitorTest, NoExecutor)
     return ResourceUsage();
   });
 
-  UPID upid("monitor", address());
+  UPID upid("monitor", process::address());
 
   Future<http::Response> response = http::get(upid, "statistics.json");
   AWAIT_READY(response);
@@ -184,7 +196,7 @@ TEST(MonitorTest, MissingStatistics)
     return usage;
   });
 
-  UPID upid("monitor", address());
+  UPID upid("monitor", process::address());
 
   Future<http::Response> response = http::get(upid, "statistics.json");
   AWAIT_READY(response);
@@ -197,6 +209,92 @@ TEST(MonitorTest, MissingStatistics)
   AWAIT_EXPECT_RESPONSE_BODY_EQ("[]", response);
 }
 
+
+class MonitorIntegrationTest : public MesosTest {};
+
+
+// This is an end-to-end test that verfies that the slave returns the
+// correct ResourceUsage based on the currently running executors, and
+// the values get from the statistics endpoint are as expected.
+TEST_F(MonitorIntegrationTest, RunningExecutor)
+{
+  Try<PID<Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Try<PID<Slave>> slave = StartSlave();
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return());        // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_FALSE(offers.get().empty());
+
+  const Offer& offer = offers.get()[0];
+
+  // Launch a task and wait until it is in RUNNING status.
+  TaskInfo task = createTask(
+      offer.slave_id(),
+      Resources::parse("cpus:1;mem:32").get(),
+      "sleep 1000");
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  driver.launchTasks(offer.id(), {task});
+
+  AWAIT_READY(status);
+  EXPECT_EQ(task.task_id(), status.get().task_id());
+  EXPECT_EQ(TASK_RUNNING, status.get().state());
+
+  // Hit the statistics endpoint and expect the response contains the
+  // resource statistics for the running container.
+  UPID upid("monitor", process::address());
+
+  Future<http::Response> response = http::get(upid, "statistics.json");
+  AWAIT_READY(response);
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
+  AWAIT_EXPECT_RESPONSE_HEADER_EQ(
+      "application/json",
+      "Content-Type",
+      response);
+
+  // Verify that the statistics in the response contains the proper
+  // resource limits for the container.
+  Try<JSON::Value> value = JSON::parse(response.get().body);
+  ASSERT_SOME(value);
+
+  Try<JSON::Value> expected = JSON::parse(strings::format(
+      "[{"
+          "\"statistics\":{"
+              "\"cpus_limit\":%g,"
+              "\"mem_limit_bytes\":%lu"
+          "}"
+      "}]",
+      1 + slave::DEFAULT_EXECUTOR_CPUS,
+      (Megabytes(32) + slave::DEFAULT_EXECUTOR_MEM).bytes()).get());
+
+  ASSERT_SOME(expected);
+  EXPECT_TRUE(value.get().contains(expected.get()));
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {