You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2015/05/14 04:17:58 UTC
[1/2] mesos git commit: Added resources estimator abstraction for
oversubscription.
Repository: mesos
Updated Branches:
refs/heads/master a00ffc855 -> 84168d17c
Added resources estimator abstraction for oversubscription.
Review: https://reviews.apache.org/r/33918
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d5dc043f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d5dc043f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d5dc043f
Branch: refs/heads/master
Commit: d5dc043f1e8b4e3a7c7b4df69f865afdaeb0924b
Parents: a00ffc8
Author: Jie Yu <yu...@gmail.com>
Authored: Wed May 6 15:30:55 2015 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed May 13 17:10:00 2015 -0700
----------------------------------------------------------------------
include/mesos/slave/resource_estimator.hpp | 61 +++++++++++++++++++
src/Makefile.am | 5 +-
src/slave/resource_estimator.cpp | 78 +++++++++++++++++++++++++
src/slave/resource_estimator.hpp | 54 +++++++++++++++++
4 files changed, 197 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/d5dc043f/include/mesos/slave/resource_estimator.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/slave/resource_estimator.hpp b/include/mesos/slave/resource_estimator.hpp
new file mode 100644
index 0000000..936c79c
--- /dev/null
+++ b/include/mesos/slave/resource_estimator.hpp
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __MESOS_SLAVE_RESOURCE_ESTIMATOR_HPP__
+#define __MESOS_SLAVE_RESOURCE_ESTIMATOR_HPP__
+
+#include <mesos/resources.hpp>
+
+#include <process/future.hpp>
+
+#include <stout/nothing.hpp>
+#include <stout/try.hpp>
+
+namespace mesos {
+namespace slave {
+
+// A slave component used for oversubscription. It estimates and
+// predicts the total resources used on the slave and informs the
+// master about resources that can be oversubscribed.
+class ResourceEstimator
+{
+public:
+ virtual ~ResourceEstimator() {}
+
+ // Initializes this resource estimator. This method needs to be
+ // called before any other member method is called.
+ // TODO(jieyu): Pass ResourceMonitor* once it's exposed.
+ virtual Try<Nothing> initialize() = 0;
+
+ // Returns the current estimation about the *maximum* amount of
+ // resources that can be oversubscribed on the slave. A new
+ // estimation will invalidate all the previously returned
+ // estimations. The slave will be calling this method continuously
+ // to get the most up-to-date estimation and forward them to the
+ // master. As a result, it is up to the resource estimator to
+ // control the speed of sending estimations to the master. To avoid
+ // overwhelming the master, it is recommended that the resource
+ // estimator should return an estimation only if the current
+ // estimation is significantly different from the previous one.
+ virtual process::Future<Resources> oversubscribed() = 0;
+};
+
+} // namespace slave {
+} // namespace mesos {
+
+#endif // __MESOS_SLAVE_RESOURCE_ESTIMATOR_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/d5dc043f/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 5b2c601..0aba884 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -353,6 +353,7 @@ libmesos_no_3rdparty_la_SOURCES = \
slave/metrics.cpp \
slave/monitor.cpp \
slave/paths.cpp \
+ slave/resource_estimator.cpp \
slave/state.cpp \
slave/slave.cpp \
slave/containerizer/containerizer.cpp \
@@ -451,7 +452,8 @@ nodist_scheduler_HEADERS = ../include/mesos/scheduler/scheduler.pb.h
slavedir = $(pkgincludedir)/slave
slave_HEADERS = \
- $(top_srcdir)/include/mesos/slave/isolator.hpp
+ $(top_srcdir)/include/mesos/slave/isolator.hpp \
+ $(top_srcdir)/include/mesos/slave/resource_estimator.hpp
if OS_LINUX
libmesos_no_3rdparty_la_SOURCES += linux/cgroups.cpp
@@ -560,6 +562,7 @@ libmesos_no_3rdparty_la_SOURCES += \
slave/metrics.hpp \
slave/monitor.hpp \
slave/paths.hpp \
+ slave/resource_estimator.hpp \
slave/slave.hpp \
slave/state.hpp \
slave/status_update_manager.hpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/d5dc043f/src/slave/resource_estimator.cpp
----------------------------------------------------------------------
diff --git a/src/slave/resource_estimator.cpp b/src/slave/resource_estimator.cpp
new file mode 100644
index 0000000..87edf17
--- /dev/null
+++ b/src/slave/resource_estimator.cpp
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <process/dispatch.hpp>
+#include <process/process.hpp>
+
+#include <stout/error.hpp>
+
+#include "slave/resource_estimator.hpp"
+
+using namespace process;
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+class NoopResourceEstimatorProcess :
+ public Process<NoopResourceEstimatorProcess>
+{
+public:
+ Future<Resources> oversubscribed()
+ {
+ return Resources();
+ }
+};
+
+
+NoopResourceEstimator::~NoopResourceEstimator()
+{
+ if (process.get() != NULL) {
+ terminate(process.get());
+ wait(process.get());
+ }
+}
+
+
+Try<Nothing> NoopResourceEstimator::initialize()
+{
+ if (process.get() != NULL) {
+ return Error("Noop resource estimator has already been initialized");
+ }
+
+ process.reset(new NoopResourceEstimatorProcess());
+ spawn(process.get());
+
+ return Nothing();
+}
+
+
+Future<Resources> NoopResourceEstimator::oversubscribed()
+{
+ if (process.get() == NULL) {
+ return Failure("Noop resource estimator is not initialized");
+ }
+
+ return dispatch(
+ process.get(),
+ &NoopResourceEstimatorProcess::oversubscribed);
+}
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/d5dc043f/src/slave/resource_estimator.hpp
----------------------------------------------------------------------
diff --git a/src/slave/resource_estimator.hpp b/src/slave/resource_estimator.hpp
new file mode 100644
index 0000000..bdf62ba
--- /dev/null
+++ b/src/slave/resource_estimator.hpp
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __SLAVE_RESOURCE_ESTIMATOR_HPP__
+#define __SLAVE_RESOURCE_ESTIMATOR_HPP__
+
+#include <mesos/slave/resource_estimator.hpp>
+
+#include <process/owned.hpp>
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+// Forward declaration.
+class NoopResourceEstimatorProcess;
+
+
+// A noop resource estimator which tells the master that no resource
+// on the slave can be oversubscribed. Using this resource estimator
+// will effectively turn off the oversubscription on the slave.
+class NoopResourceEstimator : public mesos::slave::ResourceEstimator
+{
+public:
+ virtual ~NoopResourceEstimator();
+
+ virtual Try<Nothing> initialize();
+ virtual process::Future<Resources> oversubscribed();
+
+protected:
+ process::Owned<NoopResourceEstimatorProcess> process;
+};
+
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __SLAVE_RESOURCE_ESTIMATOR_HPP__
[2/2] mesos git commit: Integrated resources estimator with the slave.
Posted by ji...@apache.org.
Integrated resources estimator with the slave.
Review: https://reviews.apache.org/r/33919
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/84168d17
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/84168d17
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/84168d17
Branch: refs/heads/master
Commit: 84168d17c671a5b37d1cee6194586baa76975994
Parents: d5dc043
Author: Jie Yu <yu...@gmail.com>
Authored: Wed May 6 15:31:44 2015 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed May 13 19:15:43 2015 -0700
----------------------------------------------------------------------
include/mesos/slave/resource_estimator.hpp | 9 ++++
src/Makefile.am | 1 +
src/local/local.cpp | 25 ++++++++-
src/messages/messages.proto | 8 +++
src/slave/constants.cpp | 6 +++
src/slave/constants.hpp | 5 ++
src/slave/flags.cpp | 4 ++
src/slave/flags.hpp | 1 +
src/slave/main.cpp | 16 +++++-
src/slave/resource_estimator.cpp | 19 +++++++
src/slave/slave.cpp | 60 ++++++++++++++++++++-
src/slave/slave.hpp | 13 ++++-
src/tests/cluster.hpp | 19 +++++--
src/tests/mesos.cpp | 19 ++++++-
src/tests/mesos.hpp | 31 +++++++++++
src/tests/oversubscription_tests.cpp | 72 +++++++++++++++++++++++++
src/tests/slave_tests.cpp | 10 +---
17 files changed, 300 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/84168d17/include/mesos/slave/resource_estimator.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/slave/resource_estimator.hpp b/include/mesos/slave/resource_estimator.hpp
index 936c79c..3639615 100644
--- a/include/mesos/slave/resource_estimator.hpp
+++ b/include/mesos/slave/resource_estimator.hpp
@@ -19,11 +19,15 @@
#ifndef __MESOS_SLAVE_RESOURCE_ESTIMATOR_HPP__
#define __MESOS_SLAVE_RESOURCE_ESTIMATOR_HPP__
+#include <string>
+
#include <mesos/resources.hpp>
#include <process/future.hpp>
+#include <stout/none.hpp>
#include <stout/nothing.hpp>
+#include <stout/option.hpp>
#include <stout/try.hpp>
namespace mesos {
@@ -35,6 +39,11 @@ namespace slave {
class ResourceEstimator
{
public:
+ // Create a resource estimator instance of the given type specified
+ // by the user. If the type is not specified, a default resource
+ // estimator instance will be created.
+ static Try<ResourceEstimator*> create(const Option<std::string>& type);
+
virtual ~ResourceEstimator() {}
// Initializes this resource estimator. This method needs to be
http://git-wip-us.apache.org/repos/asf/mesos/blob/84168d17/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 0aba884..58ebe1a 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1418,6 +1418,7 @@ mesos_tests_SOURCES = \
tests/module.cpp \
tests/module_tests.cpp \
tests/monitor_tests.cpp \
+ tests/oversubscription_tests.cpp \
tests/partition_tests.cpp \
tests/paths_tests.cpp \
tests/persistent_volume_tests.cpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/84168d17/src/local/local.cpp
----------------------------------------------------------------------
diff --git a/src/local/local.cpp b/src/local/local.cpp
index dda25ab..84f73e2 100644
--- a/src/local/local.cpp
+++ b/src/local/local.cpp
@@ -60,6 +60,7 @@
#include "module/manager.hpp"
#include "slave/gc.hpp"
+#include "slave/resource_estimator.hpp"
#include "slave/slave.hpp"
#include "slave/status_update_manager.hpp"
@@ -91,6 +92,8 @@ using mesos::internal::slave::StatusUpdateManager;
using mesos::modules::Anonymous;
using mesos::modules::ModuleManager;
+using mesos::slave::ResourceEstimator;
+
using process::Owned;
using process::PID;
using process::RateLimiter;
@@ -123,6 +126,7 @@ static Files* files = NULL;
static vector<GarbageCollector*>* garbageCollectors = NULL;
static vector<StatusUpdateManager*>* statusUpdateManagers = NULL;
static vector<Fetcher*>* fetchers = NULL;
+static vector<ResourceEstimator*>* resourceEstimators = NULL;
PID<Master> launch(const Flags& flags, Allocator* _allocator)
@@ -287,6 +291,7 @@ PID<Master> launch(const Flags& flags, Allocator* _allocator)
garbageCollectors = new vector<GarbageCollector*>();
statusUpdateManagers = new vector<StatusUpdateManager*>();
fetchers = new vector<Fetcher*>();
+ resourceEstimators = new vector<ResourceEstimator*>();
vector<UPID> pids;
@@ -306,6 +311,16 @@ PID<Master> launch(const Flags& flags, Allocator* _allocator)
statusUpdateManagers->push_back(new StatusUpdateManager(flags));
fetchers->push_back(new Fetcher());
+ Try<ResourceEstimator*> resourceEstimator =
+ ResourceEstimator::create(flags.resource_estimator);
+
+ if (resourceEstimator.isError()) {
+ EXIT(1) << "Failed to create resource estimator: "
+ << resourceEstimator.error();
+ }
+
+ resourceEstimators->push_back(resourceEstimator.get());
+
Try<Containerizer*> containerizer =
Containerizer::create(flags, true, fetchers->back());
@@ -321,7 +336,8 @@ PID<Master> launch(const Flags& flags, Allocator* _allocator)
containerizer.get(),
files,
garbageCollectors->back(),
- statusUpdateManagers->back());
+ statusUpdateManagers->back(),
+ resourceEstimators->back());
slaves[containerizer.get()] = slave;
@@ -391,6 +407,13 @@ void shutdown()
delete fetchers;
fetchers = NULL;
+ foreach (ResourceEstimator* estimator, *resourceEstimators) {
+ delete estimator;
+ }
+
+ delete resourceEstimators;
+ resourceEstimators = NULL;
+
delete registrar;
registrar = NULL;
http://git-wip-us.apache.org/repos/asf/mesos/blob/84168d17/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index 98d859f..19e2444 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -333,6 +333,14 @@ message CheckpointResourcesMessage {
}
+// This message is sent by the slave to the master to inform the
+// master about the currently available oversubscribed resources.
+message UpdateOversubscribedResourcesMessage {
+ required SlaveID slave_id = 1;
+ repeated Resource resources = 2;
+}
+
+
message RegisterExecutorMessage {
required FrameworkID framework_id = 1;
required ExecutorID executor_id = 2;
http://git-wip-us.apache.org/repos/asf/mesos/blob/84168d17/src/slave/constants.cpp
----------------------------------------------------------------------
diff --git a/src/slave/constants.cpp b/src/slave/constants.cpp
index 2a99b11..07f699a 100644
--- a/src/slave/constants.cpp
+++ b/src/slave/constants.cpp
@@ -57,6 +57,12 @@ Duration MASTER_PING_TIMEOUT()
return master::SLAVE_PING_TIMEOUT * master::MAX_SLAVE_PING_TIMEOUTS;
}
+
+Duration UPDATE_OVERSUBSCRIBED_RESOURCES_INTERVAL_MIN()
+{
+ return Seconds(5);
+}
+
} // namespace slave {
} // namespace internal {
} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/84168d17/src/slave/constants.hpp
----------------------------------------------------------------------
diff --git a/src/slave/constants.hpp b/src/slave/constants.hpp
index fd1c1ab..df02043 100644
--- a/src/slave/constants.hpp
+++ b/src/slave/constants.hpp
@@ -104,6 +104,11 @@ extern const std::string DEFAULT_AUTHENTICATEE;
// trigger a re-detection of the master to cause a re-registration.
Duration MASTER_PING_TIMEOUT();
+
+// To avoid overwhelming the master, we enforce a minimal delay
+// between two subsequent UpdateOversubscribedResourcesMessages.
+Duration UPDATE_OVERSUBSCRIBED_RESOURCES_INTERVAL_MIN();
+
} // namespace slave {
} // namespace internal {
} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/84168d17/src/slave/flags.cpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.cpp b/src/slave/flags.cpp
index d0932b0..f35c76a 100644
--- a/src/slave/flags.cpp
+++ b/src/slave/flags.cpp
@@ -436,4 +436,8 @@ mesos::internal::slave::Flags::Flags()
"hooks",
"A comma separated list of hook modules to be\n"
"installed inside the slave.");
+
+ add(&Flags::resource_estimator,
+ "resource_estimator",
+ "The name of the resource estimator to use for oversubscription.");
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/84168d17/src/slave/flags.hpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.hpp b/src/slave/flags.hpp
index 4c50be3..ca7cc13 100644
--- a/src/slave/flags.hpp
+++ b/src/slave/flags.hpp
@@ -97,6 +97,7 @@ public:
Option<Modules> modules;
std::string authenticatee;
Option<std::string> hooks;
+ Option<std::string> resource_estimator;
};
} // namespace slave {
http://git-wip-us.apache.org/repos/asf/mesos/blob/84168d17/src/slave/main.cpp
----------------------------------------------------------------------
diff --git a/src/slave/main.cpp b/src/slave/main.cpp
index c62d3ab..f762f5b 100644
--- a/src/slave/main.cpp
+++ b/src/slave/main.cpp
@@ -42,6 +42,7 @@
#include "module/manager.hpp"
#include "slave/gc.hpp"
+#include "slave/resource_estimator.hpp"
#include "slave/slave.hpp"
#include "slave/status_update_manager.hpp"
@@ -51,6 +52,8 @@ using namespace mesos::internal::slave;
using mesos::modules::Anonymous;
using mesos::modules::ModuleManager;
+using mesos::slave::ResourceEstimator;
+
using mesos::SlaveInfo;
using std::cerr;
@@ -203,19 +206,30 @@ int main(int argc, char** argv)
GarbageCollector gc;
StatusUpdateManager statusUpdateManager(flags);
+ Try<ResourceEstimator*> resourceEstimator =
+ ResourceEstimator::create(flags.resource_estimator);
+
+ if (resourceEstimator.isError()) {
+ EXIT(1) << "Failed to create resource estimator: "
+ << resourceEstimator.error();
+ }
+
Slave* slave = new Slave(
flags,
detector.get(),
containerizer.get(),
&files,
&gc,
- &statusUpdateManager);
+ &statusUpdateManager,
+ resourceEstimator.get());
process::spawn(slave);
process::wait(slave->self());
delete slave;
+ delete resourceEstimator.get();
+
delete detector.get();
delete containerizer.get();
http://git-wip-us.apache.org/repos/asf/mesos/blob/84168d17/src/slave/resource_estimator.cpp
----------------------------------------------------------------------
diff --git a/src/slave/resource_estimator.cpp b/src/slave/resource_estimator.cpp
index 87edf17..13d706c 100644
--- a/src/slave/resource_estimator.cpp
+++ b/src/slave/resource_estimator.cpp
@@ -25,6 +25,25 @@
using namespace process;
+using std::string;
+
+namespace mesos {
+namespace slave {
+
+Try<ResourceEstimator*> ResourceEstimator::create(const Option<string>& type)
+{
+ // TODO(jieyu): Support loading resource estimator from module.
+ if (type.isNone()) {
+ return new internal::slave::NoopResourceEstimator();
+ }
+
+ return Error("Unsupported resource estimator '" + type.get() + "'");
+}
+
+} // namespace slave {
+} // namespace mesos {
+
+
namespace mesos {
namespace internal {
namespace slave {
http://git-wip-us.apache.org/repos/asf/mesos/blob/84168d17/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 39967cd..132f83e 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -87,6 +87,8 @@
#include "slave/slave.hpp"
#include "slave/status_update_manager.hpp"
+using mesos::slave::ResourceEstimator;
+
using std::list;
using std::map;
using std::set;
@@ -115,7 +117,8 @@ Slave::Slave(const slave::Flags& _flags,
Containerizer* _containerizer,
Files* _files,
GarbageCollector* _gc,
- StatusUpdateManager* _statusUpdateManager)
+ StatusUpdateManager* _statusUpdateManager,
+ ResourceEstimator* _resourceEstimator)
: ProcessBase(process::ID::generate("slave")),
state(RECOVERING),
flags(_flags),
@@ -134,7 +137,8 @@ Slave::Slave(const slave::Flags& _flags,
authenticating(None()),
authenticated(false),
reauthenticate(false),
- executorDirectoryMaxAllowedAge(age(0)) {}
+ executorDirectoryMaxAllowedAge(age(0)),
+ resourceEstimator(_resourceEstimator) {}
Slave::~Slave()
@@ -319,6 +323,13 @@ void Slave::initialize()
<< "' for --gc_disk_headroom. Must be between 0.0 and 1.0.";
}
+ // TODO(jieyu): Pass ResourceMonitor* to 'initialize'.
+ Try<Nothing> initialize = resourceEstimator->initialize();
+ if (initialize.isError()) {
+ EXIT(1) << "Failed to initialize the resource estimator: "
+ << initialize.error();
+ }
+
// Ensure slave work directory exists.
CHECK_SOME(os::mkdir(flags.work_dir))
<< "Failed to create slave work directory '" << flags.work_dir << "'";
@@ -3967,6 +3978,9 @@ void Slave::__recover(const Future<Nothing>& future)
if (flags.recover == "reconnect") {
state = DISCONNECTED;
+ // Start to detect available oversubscribed resources.
+ updateOversubscribedResources();
+
// Start detecting masters.
detection = detector->detect()
.onAny(defer(self(), &Slave::detected, lambda::_1));
@@ -4056,6 +4070,48 @@ Future<Nothing> Slave::garbageCollect(const string& path)
}
+void Slave::updateOversubscribedResources()
+{
+ // TODO(jieyu): Consider switching to a push model in which the
+ // slave registers a callback with the resource estimator, and the
+ // resource estimator invokes the callback whenever a new estimation
+ // is ready (similar to the allocator/master interface).
+
+ if (state != RUNNING) {
+ delay(Seconds(1), self(), &Self::updateOversubscribedResources);
+ return;
+ }
+
+ resourceEstimator->oversubscribed()
+ .onAny(defer(self(), &Slave::_updateOversubscribedResources, lambda::_1));
+}
+
+
+void Slave::_updateOversubscribedResources(const Future<Resources>& future)
+{
+ if (!future.isReady()) {
+ LOG(ERROR) << "Failed to estimate oversubscribed resources: "
+ << (future.isFailed() ? future.failure() : "discarded");
+ } else if (state == RUNNING) {
+ CHECK_SOME(master);
+
+ LOG(INFO) << "Updating available oversubscribed resources to "
+ << future.get();
+
+ UpdateOversubscribedResourcesMessage message;
+ message.mutable_slave_id()->CopyFrom(info.id());
+ message.mutable_resources()->CopyFrom(future.get());
+
+ send(master.get(), message);
+ }
+
+ // TODO(jieyu): Consider making the interval configurable.
+ delay(UPDATE_OVERSUBSCRIBED_RESOURCES_INTERVAL_MIN(),
+ self(),
+ &Self::updateOversubscribedResources);
+}
+
+
// TODO(dhamon): Move these to their own metrics.hpp|cpp.
double Slave::_tasks_staging()
{
http://git-wip-us.apache.org/repos/asf/mesos/blob/84168d17/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index adb52b5..b62ed7b 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -33,6 +33,8 @@
#include <mesos/module/authenticatee.hpp>
+#include <mesos/slave/resource_estimator.hpp>
+
#include <process/http.hpp>
#include <process/future.hpp>
#include <process/owned.hpp>
@@ -88,7 +90,8 @@ public:
Containerizer* containerizer,
Files* files,
GarbageCollector* gc,
- StatusUpdateManager* statusUpdateManager);
+ StatusUpdateManager* statusUpdateManager,
+ mesos::slave::ResourceEstimator* resourceEstimator);
virtual ~Slave();
@@ -430,6 +433,11 @@ private:
const FrameworkID& frameworkId,
const Executor* executor);
+ // Polls oversubscribed resources estimations from resources
+ // estimator and forwards estimations to the master.
+ void updateOversubscribedResources();
+ void _updateOversubscribedResources(const process::Future<Resources>& future);
+
const Flags flags;
SlaveInfo info;
@@ -458,6 +466,7 @@ private:
process::Time startTime;
GarbageCollector* gc;
+
ResourceMonitor monitor;
StatusUpdateManager* statusUpdateManager;
@@ -498,6 +507,8 @@ private:
// Maximum age of executor directories. Will be recomputed
// periodically every flags.disk_watch_interval.
Duration executorDirectoryMaxAllowedAge;
+
+ mesos::slave::ResourceEstimator* resourceEstimator;
};
http://git-wip-us.apache.org/repos/asf/mesos/blob/84168d17/src/tests/cluster.hpp
----------------------------------------------------------------------
diff --git a/src/tests/cluster.hpp b/src/tests/cluster.hpp
index 9506166..7370c77 100644
--- a/src/tests/cluster.hpp
+++ b/src/tests/cluster.hpp
@@ -70,6 +70,7 @@
#include "slave/flags.hpp"
#include "slave/gc.hpp"
+#include "slave/resource_estimator.hpp"
#include "slave/slave.hpp"
#include "slave/status_update_manager.hpp"
@@ -169,7 +170,8 @@ public:
const Option<slave::Containerizer*>& containerizer = None(),
const Option<MasterDetector*>& detector = None(),
const Option<slave::GarbageCollector*>& gc = None(),
- const Option<slave::StatusUpdateManager*>& statusUpdateManager =
+ const Option<slave::StatusUpdateManager*>& statusUpdateManager = None(),
+ const Option<mesos::slave::ResourceEstimator*>& resourceEstimator =
None());
// Stops and cleans up a slave at the specified PID. If 'shutdown'
@@ -198,6 +200,7 @@ public:
slave::Containerizer* containerizer;
bool createdContainerizer; // Whether we own the containerizer.
+ process::Owned<mesos::slave::ResourceEstimator> resourceEstimator;
process::Owned<slave::Fetcher> fetcher;
process::Owned<slave::StatusUpdateManager> statusUpdateManager;
process::Owned<slave::GarbageCollector> gc;
@@ -530,7 +533,8 @@ inline Try<process::PID<slave::Slave>> Cluster::Slaves::start(
const Option<slave::Containerizer*>& containerizer,
const Option<MasterDetector*>& detector,
const Option<slave::GarbageCollector*>& gc,
- const Option<slave::StatusUpdateManager*>& statusUpdateManager)
+ const Option<slave::StatusUpdateManager*>& statusUpdateManager,
+ const Option<mesos::slave::ResourceEstimator*>& resourceEstimator)
{
// TODO(benh): Create a work directory if using the default.
@@ -551,6 +555,14 @@ inline Try<process::PID<slave::Slave>> Cluster::Slaves::start(
slave.createdContainerizer = true;
}
+ if (resourceEstimator.isNone()) {
+ Try<mesos::slave::ResourceEstimator*> _resourceEstimator =
+ mesos::slave::ResourceEstimator::create(flags.resource_estimator);
+
+ CHECK_SOME(_resourceEstimator);
+ slave.resourceEstimator.reset(_resourceEstimator.get());
+ }
+
// Get a detector for the master(s) if one wasn't provided.
if (detector.isNone()) {
slave.detector = masters->detector();
@@ -574,7 +586,8 @@ inline Try<process::PID<slave::Slave>> Cluster::Slaves::start(
slave.containerizer,
&cluster->files,
gc.get(slave.gc.get()),
- statusUpdateManager.get(slave.statusUpdateManager.get()));
+ statusUpdateManager.get(slave.statusUpdateManager.get()),
+ resourceEstimator.get(slave.resourceEstimator.get()));
process::PID<slave::Slave> pid = process::spawn(slave.slave);
http://git-wip-us.apache.org/repos/asf/mesos/blob/84168d17/src/tests/mesos.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.cpp b/src/tests/mesos.cpp
index bc082e8..1d5639c 100644
--- a/src/tests/mesos.cpp
+++ b/src/tests/mesos.cpp
@@ -302,7 +302,7 @@ Try<PID<slave::Slave>> MesosTest::StartSlave(
Try<PID<slave::Slave>> pid = cluster.slaves.start(
flags.isNone() ? CreateSlaveFlags() : flags.get(),
- containerizer,
+ containerizer,
detector);
if (pid.isError()) {
@@ -316,6 +316,20 @@ Try<PID<slave::Slave>> MesosTest::StartSlave(
}
+Try<PID<slave::Slave>> MesosTest::StartSlave(
+ mesos::slave::ResourceEstimator* resourceEstimator,
+ const Option<slave::Flags>& flags)
+{
+ return cluster.slaves.start(
+ flags.isNone() ? CreateSlaveFlags() : flags.get(),
+ None(),
+ None(),
+ None(),
+ None(),
+ resourceEstimator);
+}
+
+
void MesosTest::Stop(const PID<master::Master>& pid)
{
cluster.masters.stop(pid);
@@ -366,7 +380,8 @@ MockSlave::MockSlave(const slave::Flags& flags,
containerizer,
&files,
&gc,
- statusUpdateManager = new slave::StatusUpdateManager(flags))
+ statusUpdateManager = new slave::StatusUpdateManager(flags),
+ &resourceEstimator)
{
// Set up default behaviors, calling the original methods.
EXPECT_CALL(*this, runTask(_, _, _, _, _))
http://git-wip-us.apache.org/repos/asf/mesos/blob/84168d17/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 563b833..df8cd20 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -174,6 +174,11 @@ protected:
MasterDetector* detector,
const Option<slave::Flags>& flags = None());
+ // Starts a slave with the specified resource estimator and flags.
+ virtual Try<process::PID<slave::Slave>> StartSlave(
+ mesos::slave::ResourceEstimator* resourceEstimator,
+ const Option<slave::Flags>& flags = None());
+
// Stop the specified master.
virtual void Stop(
const process::PID<master::Master>& pid);
@@ -694,6 +699,31 @@ public:
};
+class MockResourceEstimator : public mesos::slave::ResourceEstimator
+{
+public:
+ MockResourceEstimator()
+ {
+ // NOTE: We use 'EXPECT_CALL' and 'WillRepeatedly' here instead of
+ // 'ON_CALL' and 'WillByDefault'. See 'TestContainerizer::SetUp()'
+ // for more details.
+ EXPECT_CALL(*this, initialize())
+ .WillRepeatedly(Return(Nothing()));
+
+ EXPECT_CALL(*this, oversubscribed())
+ .WillRepeatedly(Return(Resources()));
+ }
+
+ MOCK_METHOD0(
+ initialize,
+ Try<Nothing>());
+
+ MOCK_METHOD0(
+ oversubscribed,
+ process::Future<Resources>());
+};
+
+
// Definition of a mock Slave to be used in tests with gmock, covering
// potential races between runTask and killTask.
class MockSlave : public slave::Slave
@@ -757,6 +787,7 @@ public:
private:
Files files;
MockGarbageCollector gc;
+ MockResourceEstimator resourceEstimator;
slave::StatusUpdateManager* statusUpdateManager;
};
http://git-wip-us.apache.org/repos/asf/mesos/blob/84168d17/src/tests/oversubscription_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/oversubscription_tests.cpp b/src/tests/oversubscription_tests.cpp
new file mode 100644
index 0000000..64c2ede
--- /dev/null
+++ b/src/tests/oversubscription_tests.cpp
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gmock/gmock.h>
+
+#include <process/gtest.hpp>
+
+#include <stout/gtest.hpp>
+
+#include "master/master.hpp"
+
+#include "messages/messages.hpp"
+
+#include "slave/flags.hpp"
+#include "slave/slave.hpp"
+
+#include "tests/mesos.hpp"
+
+using namespace process;
+
+using mesos::internal::master::Master;
+
+using mesos::internal::slave::Slave;
+
+namespace mesos {
+namespace internal {
+namespace tests {
+
+class OversubscriptionSlaveTest : public MesosTest {};
+
+
+// This test verifies that slave will forward the estimation of the
+// available oversubscribed resources to the master.
+TEST_F(OversubscriptionSlaveTest, UpdateOversubcribedResourcesMessage)
+{
+ Try<PID<Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ Future<UpdateOversubscribedResourcesMessage> message =
+ FUTURE_PROTOBUF(UpdateOversubscribedResourcesMessage(), _, _);
+
+ MockResourceEstimator resourceEstimator;
+
+ EXPECT_CALL(resourceEstimator, oversubscribed())
+ .WillRepeatedly(Return(Resources()));
+
+ Try<PID<Slave>> slave = StartSlave(&resourceEstimator);
+ ASSERT_SOME(slave);
+
+ AWAIT_READY(message);
+
+ Shutdown();
+}
+
+} // namespace tests {
+} // namespace internal {
+} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/84168d17/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 5bd722e..acae497 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -543,17 +543,10 @@ TEST_F(SlaveTest, ComamndTaskWithArguments)
// mesos-executor forking. For more details of this see MESOS-1873.
TEST_F(SlaveTest, GetExecutorInfo)
{
- // Create a thin dummy Slave to access underlying getExecutorInfo().
- // Testing this method should not necessarily require an integration
- // test as with most other methods here.
- slave::Flags flags = CreateSlaveFlags();
TestContainerizer containerizer;
StandaloneMasterDetector detector;
- Files files;
- slave::StatusUpdateManager updateManager(flags);
- slave::GarbageCollector gc;
- Slave slave(flags, &detector, &containerizer, &files, &gc, &updateManager);
+ MockSlave slave(CreateSlaveFlags(), &detector, &containerizer);
FrameworkID frameworkId;
frameworkId.set_value("20141010-221431-251662764-60288-32120-0000");
@@ -585,6 +578,7 @@ TEST_F(SlaveTest, GetExecutorInfo)
EXPECT_NE(string::npos, executor.command().value().find("mesos-executor"));
}
+
// This test runs a command without the command user field set. The
// command will verify the assumption that the command is run as the
// slave user (in this case, root).