You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2015/05/14 04:17:58 UTC

[1/2] mesos git commit: Added resources estimator abstraction for oversubscription.

Repository: mesos
Updated Branches:
  refs/heads/master a00ffc855 -> 84168d17c


Added resources estimator abstraction for oversubscription.

Review: https://reviews.apache.org/r/33918


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d5dc043f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d5dc043f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d5dc043f

Branch: refs/heads/master
Commit: d5dc043f1e8b4e3a7c7b4df69f865afdaeb0924b
Parents: a00ffc8
Author: Jie Yu <yu...@gmail.com>
Authored: Wed May 6 15:30:55 2015 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed May 13 17:10:00 2015 -0700

----------------------------------------------------------------------
 include/mesos/slave/resource_estimator.hpp | 61 +++++++++++++++++++
 src/Makefile.am                            |  5 +-
 src/slave/resource_estimator.cpp           | 78 +++++++++++++++++++++++++
 src/slave/resource_estimator.hpp           | 54 +++++++++++++++++
 4 files changed, 197 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/d5dc043f/include/mesos/slave/resource_estimator.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/slave/resource_estimator.hpp b/include/mesos/slave/resource_estimator.hpp
new file mode 100644
index 0000000..936c79c
--- /dev/null
+++ b/include/mesos/slave/resource_estimator.hpp
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __MESOS_SLAVE_RESOURCE_ESTIMATOR_HPP__
+#define __MESOS_SLAVE_RESOURCE_ESTIMATOR_HPP__
+
+#include <mesos/resources.hpp>
+
+#include <process/future.hpp>
+
+#include <stout/nothing.hpp>
+#include <stout/try.hpp>
+
+namespace mesos {
+namespace slave {
+
+// A slave component used for oversubscription. It estimates and
+// predicts the total resources used on the slave and informs the
+// master about resources that can be oversubscribed.
+class ResourceEstimator
+{
+public:
+  virtual ~ResourceEstimator() {}
+
+  // Initializes this resource estimator. This method needs to be
+  // called before any other member method is called.
+  // TODO(jieyu): Pass ResourceMonitor* once it's exposed.
+  virtual Try<Nothing> initialize() = 0;
+
+  // Returns the current estimation about the *maximum* amount of
+  // resources that can be oversubscribed on the slave. A new
+  // estimation will invalidate all the previously returned
+  // estimations. The slave will be calling this method continuously
+  // to get the most up-to-date estimation and forward them to the
+  // master. As a result, it is up to the resource estimator to
+  // control the speed of sending estimations to the master. To avoid
+  // overwhelming the master, it is recommended that the resource
+  // estimator should return an estimation only if the current
+  // estimation is significantly different from the previous one.
+  virtual process::Future<Resources> oversubscribed() = 0;
+};
+
+} // namespace slave {
+} // namespace mesos {
+
+#endif // __MESOS_SLAVE_RESOURCE_ESTIMATOR_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/d5dc043f/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 5b2c601..0aba884 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -353,6 +353,7 @@ libmesos_no_3rdparty_la_SOURCES =					\
 	slave/metrics.cpp						\
 	slave/monitor.cpp						\
 	slave/paths.cpp							\
+	slave/resource_estimator.cpp					\
 	slave/state.cpp							\
 	slave/slave.cpp							\
 	slave/containerizer/containerizer.cpp				\
@@ -451,7 +452,8 @@ nodist_scheduler_HEADERS = ../include/mesos/scheduler/scheduler.pb.h
 slavedir = $(pkgincludedir)/slave
 
 slave_HEADERS =								\
-  $(top_srcdir)/include/mesos/slave/isolator.hpp
+  $(top_srcdir)/include/mesos/slave/isolator.hpp			\
+  $(top_srcdir)/include/mesos/slave/resource_estimator.hpp
 
 if OS_LINUX
   libmesos_no_3rdparty_la_SOURCES += linux/cgroups.cpp
@@ -560,6 +562,7 @@ libmesos_no_3rdparty_la_SOURCES +=					\
 	slave/metrics.hpp						\
 	slave/monitor.hpp						\
 	slave/paths.hpp							\
+	slave/resource_estimator.hpp					\
 	slave/slave.hpp							\
 	slave/state.hpp							\
 	slave/status_update_manager.hpp					\

http://git-wip-us.apache.org/repos/asf/mesos/blob/d5dc043f/src/slave/resource_estimator.cpp
----------------------------------------------------------------------
diff --git a/src/slave/resource_estimator.cpp b/src/slave/resource_estimator.cpp
new file mode 100644
index 0000000..87edf17
--- /dev/null
+++ b/src/slave/resource_estimator.cpp
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <process/dispatch.hpp>
+#include <process/process.hpp>
+
+#include <stout/error.hpp>
+
+#include "slave/resource_estimator.hpp"
+
+using namespace process;
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+class NoopResourceEstimatorProcess :
+  public Process<NoopResourceEstimatorProcess>
+{
+public:
+  Future<Resources> oversubscribed()
+  {
+    return Resources();
+  }
+};
+
+
+NoopResourceEstimator::~NoopResourceEstimator()
+{
+  if (process.get() != NULL) {
+    terminate(process.get());
+    wait(process.get());
+  }
+}
+
+
+Try<Nothing> NoopResourceEstimator::initialize()
+{
+  if (process.get() != NULL) {
+    return Error("Noop resource estimator has already been initialized");
+  }
+
+  process.reset(new NoopResourceEstimatorProcess());
+  spawn(process.get());
+
+  return Nothing();
+}
+
+
+Future<Resources> NoopResourceEstimator::oversubscribed()
+{
+  if (process.get() == NULL) {
+    return Failure("Noop resource estimator is not initialized");
+  }
+
+  return dispatch(
+      process.get(),
+      &NoopResourceEstimatorProcess::oversubscribed);
+}
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/d5dc043f/src/slave/resource_estimator.hpp
----------------------------------------------------------------------
diff --git a/src/slave/resource_estimator.hpp b/src/slave/resource_estimator.hpp
new file mode 100644
index 0000000..bdf62ba
--- /dev/null
+++ b/src/slave/resource_estimator.hpp
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __SLAVE_RESOURCE_ESTIMATOR_HPP__
+#define __SLAVE_RESOURCE_ESTIMATOR_HPP__
+
+#include <mesos/slave/resource_estimator.hpp>
+
+#include <process/owned.hpp>
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+// Forward declaration.
+class NoopResourceEstimatorProcess;
+
+
+// A noop resource estimator which tells the master that no resource
+// on the slave can be oversubscribed. Using this resource estimator
+// will effectively turn off the oversubscription on the slave.
+class NoopResourceEstimator : public mesos::slave::ResourceEstimator
+{
+public:
+  virtual ~NoopResourceEstimator();
+
+  virtual Try<Nothing> initialize();
+  virtual process::Future<Resources> oversubscribed();
+
+protected:
+  process::Owned<NoopResourceEstimatorProcess> process;
+};
+
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __SLAVE_RESOURCE_ESTIMATOR_HPP__


[2/2] mesos git commit: Integrated resources estimator with the slave.

Posted by ji...@apache.org.
Integrated resources estimator with the slave.

Review: https://reviews.apache.org/r/33919


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/84168d17
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/84168d17
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/84168d17

Branch: refs/heads/master
Commit: 84168d17c671a5b37d1cee6194586baa76975994
Parents: d5dc043
Author: Jie Yu <yu...@gmail.com>
Authored: Wed May 6 15:31:44 2015 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed May 13 19:15:43 2015 -0700

----------------------------------------------------------------------
 include/mesos/slave/resource_estimator.hpp |  9 ++++
 src/Makefile.am                            |  1 +
 src/local/local.cpp                        | 25 ++++++++-
 src/messages/messages.proto                |  8 +++
 src/slave/constants.cpp                    |  6 +++
 src/slave/constants.hpp                    |  5 ++
 src/slave/flags.cpp                        |  4 ++
 src/slave/flags.hpp                        |  1 +
 src/slave/main.cpp                         | 16 +++++-
 src/slave/resource_estimator.cpp           | 19 +++++++
 src/slave/slave.cpp                        | 60 ++++++++++++++++++++-
 src/slave/slave.hpp                        | 13 ++++-
 src/tests/cluster.hpp                      | 19 +++++--
 src/tests/mesos.cpp                        | 19 ++++++-
 src/tests/mesos.hpp                        | 31 +++++++++++
 src/tests/oversubscription_tests.cpp       | 72 +++++++++++++++++++++++++
 src/tests/slave_tests.cpp                  | 10 +---
 17 files changed, 300 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/84168d17/include/mesos/slave/resource_estimator.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/slave/resource_estimator.hpp b/include/mesos/slave/resource_estimator.hpp
index 936c79c..3639615 100644
--- a/include/mesos/slave/resource_estimator.hpp
+++ b/include/mesos/slave/resource_estimator.hpp
@@ -19,11 +19,15 @@
 #ifndef __MESOS_SLAVE_RESOURCE_ESTIMATOR_HPP__
 #define __MESOS_SLAVE_RESOURCE_ESTIMATOR_HPP__
 
+#include <string>
+
 #include <mesos/resources.hpp>
 
 #include <process/future.hpp>
 
+#include <stout/none.hpp>
 #include <stout/nothing.hpp>
+#include <stout/option.hpp>
 #include <stout/try.hpp>
 
 namespace mesos {
@@ -35,6 +39,11 @@ namespace slave {
 class ResourceEstimator
 {
 public:
+  // Create a resource estimator instance of the given type specified
+  // by the user. If the type is not specified, a default resource
+  // estimator instance will be created.
+  static Try<ResourceEstimator*> create(const Option<std::string>& type);
+
   virtual ~ResourceEstimator() {}
 
   // Initializes this resource estimator. This method needs to be

http://git-wip-us.apache.org/repos/asf/mesos/blob/84168d17/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 0aba884..58ebe1a 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1418,6 +1418,7 @@ mesos_tests_SOURCES =				\
   tests/module.cpp				\
   tests/module_tests.cpp			\
   tests/monitor_tests.cpp			\
+  tests/oversubscription_tests.cpp		\
   tests/partition_tests.cpp			\
   tests/paths_tests.cpp				\
   tests/persistent_volume_tests.cpp		\

http://git-wip-us.apache.org/repos/asf/mesos/blob/84168d17/src/local/local.cpp
----------------------------------------------------------------------
diff --git a/src/local/local.cpp b/src/local/local.cpp
index dda25ab..84f73e2 100644
--- a/src/local/local.cpp
+++ b/src/local/local.cpp
@@ -60,6 +60,7 @@
 #include "module/manager.hpp"
 
 #include "slave/gc.hpp"
+#include "slave/resource_estimator.hpp"
 #include "slave/slave.hpp"
 #include "slave/status_update_manager.hpp"
 
@@ -91,6 +92,8 @@ using mesos::internal::slave::StatusUpdateManager;
 using mesos::modules::Anonymous;
 using mesos::modules::ModuleManager;
 
+using mesos::slave::ResourceEstimator;
+
 using process::Owned;
 using process::PID;
 using process::RateLimiter;
@@ -123,6 +126,7 @@ static Files* files = NULL;
 static vector<GarbageCollector*>* garbageCollectors = NULL;
 static vector<StatusUpdateManager*>* statusUpdateManagers = NULL;
 static vector<Fetcher*>* fetchers = NULL;
+static vector<ResourceEstimator*>* resourceEstimators = NULL;
 
 
 PID<Master> launch(const Flags& flags, Allocator* _allocator)
@@ -287,6 +291,7 @@ PID<Master> launch(const Flags& flags, Allocator* _allocator)
   garbageCollectors = new vector<GarbageCollector*>();
   statusUpdateManagers = new vector<StatusUpdateManager*>();
   fetchers = new vector<Fetcher*>();
+  resourceEstimators = new vector<ResourceEstimator*>();
 
   vector<UPID> pids;
 
@@ -306,6 +311,16 @@ PID<Master> launch(const Flags& flags, Allocator* _allocator)
     statusUpdateManagers->push_back(new StatusUpdateManager(flags));
     fetchers->push_back(new Fetcher());
 
+    Try<ResourceEstimator*> resourceEstimator =
+      ResourceEstimator::create(flags.resource_estimator);
+
+    if (resourceEstimator.isError()) {
+      EXIT(1) << "Failed to create resource estimator: "
+              << resourceEstimator.error();
+    }
+
+    resourceEstimators->push_back(resourceEstimator.get());
+
     Try<Containerizer*> containerizer =
       Containerizer::create(flags, true, fetchers->back());
 
@@ -321,7 +336,8 @@ PID<Master> launch(const Flags& flags, Allocator* _allocator)
         containerizer.get(),
         files,
         garbageCollectors->back(),
-        statusUpdateManagers->back());
+        statusUpdateManagers->back(),
+        resourceEstimators->back());
 
     slaves[containerizer.get()] = slave;
 
@@ -391,6 +407,13 @@ void shutdown()
     delete fetchers;
     fetchers = NULL;
 
+    foreach (ResourceEstimator* estimator, *resourceEstimators) {
+      delete estimator;
+    }
+
+    delete resourceEstimators;
+    resourceEstimators = NULL;
+
     delete registrar;
     registrar = NULL;
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/84168d17/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index 98d859f..19e2444 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -333,6 +333,14 @@ message CheckpointResourcesMessage {
 }
 
 
+// This message is sent by the slave to the master to inform the
+// master about the currently available oversubscribed resources.
+message UpdateOversubscribedResourcesMessage {
+  required SlaveID slave_id = 1;
+  repeated Resource resources = 2;
+}
+
+
 message RegisterExecutorMessage {
   required FrameworkID framework_id = 1;
   required ExecutorID executor_id = 2;

http://git-wip-us.apache.org/repos/asf/mesos/blob/84168d17/src/slave/constants.cpp
----------------------------------------------------------------------
diff --git a/src/slave/constants.cpp b/src/slave/constants.cpp
index 2a99b11..07f699a 100644
--- a/src/slave/constants.cpp
+++ b/src/slave/constants.cpp
@@ -57,6 +57,12 @@ Duration MASTER_PING_TIMEOUT()
   return master::SLAVE_PING_TIMEOUT * master::MAX_SLAVE_PING_TIMEOUTS;
 }
 
+
+Duration UPDATE_OVERSUBSCRIBED_RESOURCES_INTERVAL_MIN()
+{
+  return Seconds(5);
+}
+
 } // namespace slave {
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/84168d17/src/slave/constants.hpp
----------------------------------------------------------------------
diff --git a/src/slave/constants.hpp b/src/slave/constants.hpp
index fd1c1ab..df02043 100644
--- a/src/slave/constants.hpp
+++ b/src/slave/constants.hpp
@@ -104,6 +104,11 @@ extern const std::string DEFAULT_AUTHENTICATEE;
 // trigger a re-detection of the master to cause a re-registration.
 Duration MASTER_PING_TIMEOUT();
 
+
+// To avoid overwhelming the master, we enforce a minimal delay
+// between two subsequent UpdateOversubscribedResourcesMessages.
+Duration UPDATE_OVERSUBSCRIBED_RESOURCES_INTERVAL_MIN();
+
 } // namespace slave {
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/84168d17/src/slave/flags.cpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.cpp b/src/slave/flags.cpp
index d0932b0..f35c76a 100644
--- a/src/slave/flags.cpp
+++ b/src/slave/flags.cpp
@@ -436,4 +436,8 @@ mesos::internal::slave::Flags::Flags()
       "hooks",
       "A comma separated list of hook modules to be\n"
       "installed inside the slave.");
+
+  add(&Flags::resource_estimator,
+      "resource_estimator",
+      "The name of the resource estimator to use for oversubscription.");
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/84168d17/src/slave/flags.hpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.hpp b/src/slave/flags.hpp
index 4c50be3..ca7cc13 100644
--- a/src/slave/flags.hpp
+++ b/src/slave/flags.hpp
@@ -97,6 +97,7 @@ public:
   Option<Modules> modules;
   std::string authenticatee;
   Option<std::string> hooks;
+  Option<std::string> resource_estimator;
 };
 
 } // namespace slave {

http://git-wip-us.apache.org/repos/asf/mesos/blob/84168d17/src/slave/main.cpp
----------------------------------------------------------------------
diff --git a/src/slave/main.cpp b/src/slave/main.cpp
index c62d3ab..f762f5b 100644
--- a/src/slave/main.cpp
+++ b/src/slave/main.cpp
@@ -42,6 +42,7 @@
 #include "module/manager.hpp"
 
 #include "slave/gc.hpp"
+#include "slave/resource_estimator.hpp"
 #include "slave/slave.hpp"
 #include "slave/status_update_manager.hpp"
 
@@ -51,6 +52,8 @@ using namespace mesos::internal::slave;
 using mesos::modules::Anonymous;
 using mesos::modules::ModuleManager;
 
+using mesos::slave::ResourceEstimator;
+
 using mesos::SlaveInfo;
 
 using std::cerr;
@@ -203,19 +206,30 @@ int main(int argc, char** argv)
   GarbageCollector gc;
   StatusUpdateManager statusUpdateManager(flags);
 
+  Try<ResourceEstimator*> resourceEstimator =
+    ResourceEstimator::create(flags.resource_estimator);
+
+  if (resourceEstimator.isError()) {
+    EXIT(1) << "Failed to create resource estimator: "
+            << resourceEstimator.error();
+  }
+
   Slave* slave = new Slave(
       flags,
       detector.get(),
       containerizer.get(),
       &files,
       &gc,
-      &statusUpdateManager);
+      &statusUpdateManager,
+      resourceEstimator.get());
 
   process::spawn(slave);
   process::wait(slave->self());
 
   delete slave;
 
+  delete resourceEstimator.get();
+
   delete detector.get();
 
   delete containerizer.get();

http://git-wip-us.apache.org/repos/asf/mesos/blob/84168d17/src/slave/resource_estimator.cpp
----------------------------------------------------------------------
diff --git a/src/slave/resource_estimator.cpp b/src/slave/resource_estimator.cpp
index 87edf17..13d706c 100644
--- a/src/slave/resource_estimator.cpp
+++ b/src/slave/resource_estimator.cpp
@@ -25,6 +25,25 @@
 
 using namespace process;
 
+using std::string;
+
+namespace mesos {
+namespace slave {
+
+Try<ResourceEstimator*> ResourceEstimator::create(const Option<string>& type)
+{
+  // TODO(jieyu): Support loading resource estimator from module.
+  if (type.isNone()) {
+    return new internal::slave::NoopResourceEstimator();
+  }
+
+  return Error("Unsupported resource estimator '" + type.get() + "'");
+}
+
+} // namespace slave {
+} // namespace mesos {
+
+
 namespace mesos {
 namespace internal {
 namespace slave {

http://git-wip-us.apache.org/repos/asf/mesos/blob/84168d17/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 39967cd..132f83e 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -87,6 +87,8 @@
 #include "slave/slave.hpp"
 #include "slave/status_update_manager.hpp"
 
+using mesos::slave::ResourceEstimator;
+
 using std::list;
 using std::map;
 using std::set;
@@ -115,7 +117,8 @@ Slave::Slave(const slave::Flags& _flags,
              Containerizer* _containerizer,
              Files* _files,
              GarbageCollector* _gc,
-             StatusUpdateManager* _statusUpdateManager)
+             StatusUpdateManager* _statusUpdateManager,
+             ResourceEstimator* _resourceEstimator)
   : ProcessBase(process::ID::generate("slave")),
     state(RECOVERING),
     flags(_flags),
@@ -134,7 +137,8 @@ Slave::Slave(const slave::Flags& _flags,
     authenticating(None()),
     authenticated(false),
     reauthenticate(false),
-    executorDirectoryMaxAllowedAge(age(0)) {}
+    executorDirectoryMaxAllowedAge(age(0)),
+    resourceEstimator(_resourceEstimator) {}
 
 
 Slave::~Slave()
@@ -319,6 +323,13 @@ void Slave::initialize()
             << "' for --gc_disk_headroom. Must be between 0.0 and 1.0.";
   }
 
+  // TODO(jieyu): Pass ResourceMonitor* to 'initialize'.
+  Try<Nothing> initialize = resourceEstimator->initialize();
+  if (initialize.isError()) {
+    EXIT(1) << "Failed to initialize the resource estimator: "
+            << initialize.error();
+  }
+
   // Ensure slave work directory exists.
   CHECK_SOME(os::mkdir(flags.work_dir))
     << "Failed to create slave work directory '" << flags.work_dir << "'";
@@ -3967,6 +3978,9 @@ void Slave::__recover(const Future<Nothing>& future)
   if (flags.recover == "reconnect") {
     state = DISCONNECTED;
 
+    // Start to detect available oversubscribed resources.
+    updateOversubscribedResources();
+
     // Start detecting masters.
     detection = detector->detect()
       .onAny(defer(self(), &Slave::detected, lambda::_1));
@@ -4056,6 +4070,48 @@ Future<Nothing> Slave::garbageCollect(const string& path)
 }
 
 
+void Slave::updateOversubscribedResources()
+{
+  // TODO(jieyu): Consider switching to a push model in which the
+  // slave registers a callback with the resource estimator, and the
+  // resource estimator invokes the callback whenever a new estimation
+  // is ready (similar to the allocator/master interface).
+
+  if (state != RUNNING) {
+    delay(Seconds(1), self(), &Self::updateOversubscribedResources);
+    return;
+  }
+
+  resourceEstimator->oversubscribed()
+    .onAny(defer(self(), &Slave::_updateOversubscribedResources, lambda::_1));
+}
+
+
+void Slave::_updateOversubscribedResources(const Future<Resources>& future)
+{
+  if (!future.isReady()) {
+    LOG(ERROR) << "Failed to estimate oversubscribed resources: "
+               << (future.isFailed() ? future.failure() : "discarded");
+  } else if (state == RUNNING) {
+    CHECK_SOME(master);
+
+    LOG(INFO) << "Updating available oversubscribed resources to "
+              << future.get();
+
+    UpdateOversubscribedResourcesMessage message;
+    message.mutable_slave_id()->CopyFrom(info.id());
+    message.mutable_resources()->CopyFrom(future.get());
+
+    send(master.get(), message);
+  }
+
+  // TODO(jieyu): Consider making the interval configurable.
+  delay(UPDATE_OVERSUBSCRIBED_RESOURCES_INTERVAL_MIN(),
+        self(),
+        &Self::updateOversubscribedResources);
+}
+
+
 // TODO(dhamon): Move these to their own metrics.hpp|cpp.
 double Slave::_tasks_staging()
 {

http://git-wip-us.apache.org/repos/asf/mesos/blob/84168d17/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index adb52b5..b62ed7b 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -33,6 +33,8 @@
 
 #include <mesos/module/authenticatee.hpp>
 
+#include <mesos/slave/resource_estimator.hpp>
+
 #include <process/http.hpp>
 #include <process/future.hpp>
 #include <process/owned.hpp>
@@ -88,7 +90,8 @@ public:
         Containerizer* containerizer,
         Files* files,
         GarbageCollector* gc,
-        StatusUpdateManager* statusUpdateManager);
+        StatusUpdateManager* statusUpdateManager,
+        mesos::slave::ResourceEstimator* resourceEstimator);
 
   virtual ~Slave();
 
@@ -430,6 +433,11 @@ private:
       const FrameworkID& frameworkId,
       const Executor* executor);
 
+  // Polls oversubscribed resources estimations from resources
+  // estimator and forwards estimations to the master.
+  void updateOversubscribedResources();
+  void _updateOversubscribedResources(const process::Future<Resources>& future);
+
   const Flags flags;
 
   SlaveInfo info;
@@ -458,6 +466,7 @@ private:
   process::Time startTime;
 
   GarbageCollector* gc;
+
   ResourceMonitor monitor;
 
   StatusUpdateManager* statusUpdateManager;
@@ -498,6 +507,8 @@ private:
   // Maximum age of executor directories. Will be recomputed
   // periodically every flags.disk_watch_interval.
   Duration executorDirectoryMaxAllowedAge;
+
+  mesos::slave::ResourceEstimator* resourceEstimator;
 };
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/84168d17/src/tests/cluster.hpp
----------------------------------------------------------------------
diff --git a/src/tests/cluster.hpp b/src/tests/cluster.hpp
index 9506166..7370c77 100644
--- a/src/tests/cluster.hpp
+++ b/src/tests/cluster.hpp
@@ -70,6 +70,7 @@
 
 #include "slave/flags.hpp"
 #include "slave/gc.hpp"
+#include "slave/resource_estimator.hpp"
 #include "slave/slave.hpp"
 #include "slave/status_update_manager.hpp"
 
@@ -169,7 +170,8 @@ public:
         const Option<slave::Containerizer*>& containerizer = None(),
         const Option<MasterDetector*>& detector = None(),
         const Option<slave::GarbageCollector*>& gc = None(),
-        const Option<slave::StatusUpdateManager*>& statusUpdateManager =
+        const Option<slave::StatusUpdateManager*>& statusUpdateManager = None(),
+        const Option<mesos::slave::ResourceEstimator*>& resourceEstimator =
           None());
 
     // Stops and cleans up a slave at the specified PID. If 'shutdown'
@@ -198,6 +200,7 @@ public:
       slave::Containerizer* containerizer;
       bool createdContainerizer; // Whether we own the containerizer.
 
+      process::Owned<mesos::slave::ResourceEstimator> resourceEstimator;
       process::Owned<slave::Fetcher> fetcher;
       process::Owned<slave::StatusUpdateManager> statusUpdateManager;
       process::Owned<slave::GarbageCollector> gc;
@@ -530,7 +533,8 @@ inline Try<process::PID<slave::Slave>> Cluster::Slaves::start(
     const Option<slave::Containerizer*>& containerizer,
     const Option<MasterDetector*>& detector,
     const Option<slave::GarbageCollector*>& gc,
-    const Option<slave::StatusUpdateManager*>& statusUpdateManager)
+    const Option<slave::StatusUpdateManager*>& statusUpdateManager,
+    const Option<mesos::slave::ResourceEstimator*>& resourceEstimator)
 {
   // TODO(benh): Create a work directory if using the default.
 
@@ -551,6 +555,14 @@ inline Try<process::PID<slave::Slave>> Cluster::Slaves::start(
     slave.createdContainerizer = true;
   }
 
+  if (resourceEstimator.isNone()) {
+    Try<mesos::slave::ResourceEstimator*> _resourceEstimator =
+      mesos::slave::ResourceEstimator::create(flags.resource_estimator);
+
+    CHECK_SOME(_resourceEstimator);
+    slave.resourceEstimator.reset(_resourceEstimator.get());
+  }
+
   // Get a detector for the master(s) if one wasn't provided.
   if (detector.isNone()) {
     slave.detector = masters->detector();
@@ -574,7 +586,8 @@ inline Try<process::PID<slave::Slave>> Cluster::Slaves::start(
       slave.containerizer,
       &cluster->files,
       gc.get(slave.gc.get()),
-      statusUpdateManager.get(slave.statusUpdateManager.get()));
+      statusUpdateManager.get(slave.statusUpdateManager.get()),
+      resourceEstimator.get(slave.resourceEstimator.get()));
 
   process::PID<slave::Slave> pid = process::spawn(slave.slave);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/84168d17/src/tests/mesos.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.cpp b/src/tests/mesos.cpp
index bc082e8..1d5639c 100644
--- a/src/tests/mesos.cpp
+++ b/src/tests/mesos.cpp
@@ -302,7 +302,7 @@ Try<PID<slave::Slave>> MesosTest::StartSlave(
 
   Try<PID<slave::Slave>> pid = cluster.slaves.start(
       flags.isNone() ? CreateSlaveFlags() : flags.get(),
-          containerizer,
+      containerizer,
       detector);
 
   if (pid.isError()) {
@@ -316,6 +316,20 @@ Try<PID<slave::Slave>> MesosTest::StartSlave(
 }
 
 
+Try<PID<slave::Slave>> MesosTest::StartSlave(
+    mesos::slave::ResourceEstimator* resourceEstimator,
+    const Option<slave::Flags>& flags)
+{
+  return cluster.slaves.start(
+      flags.isNone() ? CreateSlaveFlags() : flags.get(),
+      None(),
+      None(),
+      None(),
+      None(),
+      resourceEstimator);
+}
+
+
 void MesosTest::Stop(const PID<master::Master>& pid)
 {
   cluster.masters.stop(pid);
@@ -366,7 +380,8 @@ MockSlave::MockSlave(const slave::Flags& flags,
       containerizer,
       &files,
       &gc,
-      statusUpdateManager = new slave::StatusUpdateManager(flags))
+      statusUpdateManager = new slave::StatusUpdateManager(flags),
+      &resourceEstimator)
 {
   // Set up default behaviors, calling the original methods.
   EXPECT_CALL(*this, runTask(_, _, _, _, _))

http://git-wip-us.apache.org/repos/asf/mesos/blob/84168d17/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 563b833..df8cd20 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -174,6 +174,11 @@ protected:
       MasterDetector* detector,
       const Option<slave::Flags>& flags = None());
 
+  // Starts a slave with the specified resource estimator and flags.
+  virtual Try<process::PID<slave::Slave>> StartSlave(
+      mesos::slave::ResourceEstimator* resourceEstimator,
+      const Option<slave::Flags>& flags = None());
+
   // Stop the specified master.
   virtual void Stop(
       const process::PID<master::Master>& pid);
@@ -694,6 +699,31 @@ public:
 };
 
 
+class MockResourceEstimator : public mesos::slave::ResourceEstimator
+{
+public:
+  MockResourceEstimator()
+  {
+    // NOTE: We use 'EXPECT_CALL' and 'WillRepeatedly' here instead of
+    // 'ON_CALL' and 'WillByDefault'. See 'TestContainerizer::SetUp()'
+    // for more details.
+    EXPECT_CALL(*this, initialize())
+      .WillRepeatedly(Return(Nothing()));
+
+    EXPECT_CALL(*this, oversubscribed())
+      .WillRepeatedly(Return(Resources()));
+  }
+
+  MOCK_METHOD0(
+      initialize,
+      Try<Nothing>());
+
+  MOCK_METHOD0(
+      oversubscribed,
+      process::Future<Resources>());
+};
+
+
 // Definition of a mock Slave to be used in tests with gmock, covering
 // potential races between runTask and killTask.
 class MockSlave : public slave::Slave
@@ -757,6 +787,7 @@ public:
 private:
   Files files;
   MockGarbageCollector gc;
+  MockResourceEstimator resourceEstimator;
   slave::StatusUpdateManager* statusUpdateManager;
 };
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/84168d17/src/tests/oversubscription_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/oversubscription_tests.cpp b/src/tests/oversubscription_tests.cpp
new file mode 100644
index 0000000..64c2ede
--- /dev/null
+++ b/src/tests/oversubscription_tests.cpp
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gmock/gmock.h>
+
+#include <process/gtest.hpp>
+
+#include <stout/gtest.hpp>
+
+#include "master/master.hpp"
+
+#include "messages/messages.hpp"
+
+#include "slave/flags.hpp"
+#include "slave/slave.hpp"
+
+#include "tests/mesos.hpp"
+
+using namespace process;
+
+using mesos::internal::master::Master;
+
+using mesos::internal::slave::Slave;
+
+namespace mesos {
+namespace internal {
+namespace tests {
+
+class OversubscriptionSlaveTest : public MesosTest {};
+
+
+// This test verifies that slave will forward the estimation of the
+// available oversubscribed resources to the master.
+TEST_F(OversubscriptionSlaveTest, UpdateOversubcribedResourcesMessage)
+{
+  Try<PID<Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Future<UpdateOversubscribedResourcesMessage> message =
+    FUTURE_PROTOBUF(UpdateOversubscribedResourcesMessage(), _, _);
+
+  MockResourceEstimator resourceEstimator;
+
+  EXPECT_CALL(resourceEstimator, oversubscribed())
+    .WillRepeatedly(Return(Resources()));
+
+  Try<PID<Slave>> slave = StartSlave(&resourceEstimator);
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(message);
+
+  Shutdown();
+}
+
+} // namespace tests {
+} // namespace internal {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/84168d17/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 5bd722e..acae497 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -543,17 +543,10 @@ TEST_F(SlaveTest, ComamndTaskWithArguments)
 // mesos-executor forking. For more details of this see MESOS-1873.
 TEST_F(SlaveTest, GetExecutorInfo)
 {
-  // Create a thin dummy Slave to access underlying getExecutorInfo().
-  // Testing this method should not necessarily require an integration
-  // test as with most other methods here.
-  slave::Flags flags = CreateSlaveFlags();
   TestContainerizer containerizer;
   StandaloneMasterDetector detector;
-  Files files;
-  slave::StatusUpdateManager updateManager(flags);
 
-  slave::GarbageCollector gc;
-  Slave slave(flags, &detector, &containerizer, &files, &gc, &updateManager);
+  MockSlave slave(CreateSlaveFlags(), &detector, &containerizer);
 
   FrameworkID frameworkId;
   frameworkId.set_value("20141010-221431-251662764-60288-32120-0000");
@@ -585,6 +578,7 @@ TEST_F(SlaveTest, GetExecutorInfo)
   EXPECT_NE(string::npos, executor.command().value().find("mesos-executor"));
 }
 
+
 // This test runs a command without the command user field set. The
 // command will verify the assumption that the command is run as the
 // slave user (in this case, root).