You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2017/11/22 00:40:16 UTC
[2/3] mesos git commit: Started `LocalResourceProviderDaemon` after
obtaining the slave ID.
Started `LocalResourceProviderDaemon` after obtaining the slave ID.
The local resource provider daemon is now started aftor we get a slave
id (either through registration or recovery). The slave id will be
eventually passed into each local resource provider for checkpointing
their metadata.
Review: https://reviews.apache.org/r/63376/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/138e5c03
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/138e5c03
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/138e5c03
Branch: refs/heads/master
Commit: 138e5c030a54b8ccd49b78108646267479ff9812
Parents: f8c40a7
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Tue Nov 21 15:53:11 2017 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Tue Nov 21 16:08:23 2017 -0800
----------------------------------------------------------------------
src/resource_provider/daemon.cpp | 112 ++++++++++++++++++++++++----------
src/resource_provider/daemon.hpp | 4 ++
src/slave/slave.cpp | 6 ++
3 files changed, 91 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/138e5c03/src/resource_provider/daemon.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/daemon.cpp b/src/resource_provider/daemon.cpp
index d584eb9..4026c14 100644
--- a/src/resource_provider/daemon.cpp
+++ b/src/resource_provider/daemon.cpp
@@ -17,14 +17,15 @@
#include "resource_provider/daemon.hpp"
#include <utility>
-#include <vector>
#include <glog/logging.h>
+#include <process/dispatch.hpp>
#include <process/id.hpp>
#include <process/process.hpp>
#include <stout/foreach.hpp>
+#include <stout/hashmap.hpp>
#include <stout/json.hpp>
#include <stout/nothing.hpp>
#include <stout/option.hpp>
@@ -35,14 +36,18 @@
#include "resource_provider/local.hpp"
+namespace http = process::http;
+
using std::list;
using std::string;
-using std::vector;
+using process::Failure;
+using process::Future;
using process::Owned;
using process::Process;
using process::ProcessBase;
+using process::dispatch;
using process::spawn;
using process::terminate;
using process::wait;
@@ -55,7 +60,7 @@ class LocalResourceProviderDaemonProcess
{
public:
LocalResourceProviderDaemonProcess(
- const process::http::URL& _url,
+ const http::URL& _url,
const string& _workDir,
const Option<string>& _configDir)
: ProcessBase(process::ID::generate("local-resource-provider-daemon")),
@@ -63,31 +68,61 @@ public:
workDir(_workDir),
configDir(_configDir) {}
+ LocalResourceProviderDaemonProcess(
+ const LocalResourceProviderDaemonProcess& other) = delete;
+
+ LocalResourceProviderDaemonProcess& operator=(
+ const LocalResourceProviderDaemonProcess& other) = delete;
+
+ void start(const SlaveID& _slaveId);
+
protected:
void initialize() override;
private:
- struct Provider
+ struct ProviderData
{
- Provider(const ResourceProviderInfo& _info,
- Owned<LocalResourceProvider> _provider)
- : info(_info),
- provider(std::move(_provider)) {}
+ ProviderData(const ResourceProviderInfo& _info)
+ : info(_info) {}
const ResourceProviderInfo info;
- const Owned<LocalResourceProvider> provider;
+ Owned<LocalResourceProvider> provider;
};
Try<Nothing> load(const string& path);
- const process::http::URL url;
+ Future<Nothing> launch(const string& type, const string& name);
+
+ const http::URL url;
const string workDir;
const Option<string> configDir;
- vector<Provider> providers;
+ Option<SlaveID> slaveId;
+ hashmap<string, hashmap<string, ProviderData>> providers;
};
+void LocalResourceProviderDaemonProcess::start(const SlaveID& _slaveId)
+{
+ CHECK_NONE(slaveId) << "Local resource provider daemon is already started";
+
+ slaveId = _slaveId;
+
+ foreachkey (const string& type, providers) {
+ foreachkey (const string& name, providers[type]) {
+ auto error = [=](const string& message) {
+ LOG(ERROR) << "Failed to launch resource provider with type '" << type
+ << "' and name '" << name << "': " << message;
+ };
+
+ launch(type, name)
+ .onFailed(error)
+ .onDiscarded(std::bind(error, "future discarded"));
+ }
+ }
+}
+
+
void LocalResourceProviderDaemonProcess::initialize()
{
if (configDir.isNone()) {
@@ -96,8 +131,9 @@ void LocalResourceProviderDaemonProcess::initialize()
Try<list<string>> entries = os::ls(configDir.get());
if (entries.isError()) {
- LOG(ERROR) << "Unable to list the resource provider directory '"
+ LOG(FATAL) << "Unable to list the resource provider config directory '"
<< configDir.get() << "': " << entries.error();
+ return;
}
foreach (const string& entry, entries.get()) {
@@ -111,7 +147,6 @@ void LocalResourceProviderDaemonProcess::initialize()
if (loading.isError()) {
LOG(ERROR) << "Failed to load resource provider config '"
<< path << "': " << loading.error();
- continue;
}
}
}
@@ -137,32 +172,44 @@ Try<Nothing> LocalResourceProviderDaemonProcess::load(const string& path)
}
// Ensure that ('type', 'name') pair is unique.
- foreach (const Provider& provider, providers) {
- if (info->type() == provider.info.type() &&
- info->name() == provider.info.name()) {
- return Error(
- "Multiple resource providers with type '" + info->type() +
- "' and name '" + info->name() + "'");
- }
+ if (providers[info->type()].contains(info->name())) {
+ return Error(
+ "Multiple resource providers with type '" + info->type() +
+ "' and name '" + info->name() + "'");
}
+ providers[info->type()].put(info->name(), info.get());
+
+ return Nothing();
+}
+
+
+Future<Nothing> LocalResourceProviderDaemonProcess::launch(
+ const string& type,
+ const string& name)
+{
+ CHECK_SOME(slaveId);
+ CHECK(providers[type].contains(name));
+
+ ProviderData& data = providers[type].at(name);
+
Try<Owned<LocalResourceProvider>> provider =
- LocalResourceProvider::create(url, info.get());
+ LocalResourceProvider::create(url, data.info);
if (provider.isError()) {
- return Error(
- "Failed to create resource provider with type '" + info->type() +
- "' and name '" + info->name() + "'");
+ return Failure(
+ "Failed to create resource provider with type '" + type +
+ "' and name '" + name + "': " + provider.error());
}
- providers.emplace_back(info.get(), provider.get());
+ data.provider = provider.get();
return Nothing();
}
Try<Owned<LocalResourceProviderDaemon>> LocalResourceProviderDaemon::create(
- const process::http::URL& url,
+ const http::URL& url,
const slave::Flags& flags)
{
// We require that the config directory exists to create a daemon.
@@ -171,15 +218,12 @@ Try<Owned<LocalResourceProviderDaemon>> LocalResourceProviderDaemon::create(
return Error("Config directory '" + configDir.get() + "' does not exist");
}
- return new LocalResourceProviderDaemon(
- url,
- flags.work_dir,
- configDir);
+ return new LocalResourceProviderDaemon(url, flags.work_dir, configDir);
}
LocalResourceProviderDaemon::LocalResourceProviderDaemon(
- const process::http::URL& url,
+ const http::URL& url,
const string& workDir,
const Option<string>& configDir)
: process(new LocalResourceProviderDaemonProcess(url, workDir, configDir))
@@ -194,5 +238,11 @@ LocalResourceProviderDaemon::~LocalResourceProviderDaemon()
wait(process.get());
}
+
+void LocalResourceProviderDaemon::start(const SlaveID& slaveId)
+{
+ dispatch(process.get(), &LocalResourceProviderDaemonProcess::start, slaveId);
+}
+
} // namespace internal {
} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/138e5c03/src/resource_provider/daemon.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/daemon.hpp b/src/resource_provider/daemon.hpp
index ef6c356..8668cd1 100644
--- a/src/resource_provider/daemon.hpp
+++ b/src/resource_provider/daemon.hpp
@@ -19,6 +19,8 @@
#include <string>
+#include <mesos/mesos.hpp>
+
#include <process/http.hpp>
#include <process/owned.hpp>
@@ -52,6 +54,8 @@ public:
LocalResourceProviderDaemon& operator=(
const LocalResourceProviderDaemon& other) = delete;
+ void start(const SlaveID& slaveId);
+
private:
LocalResourceProviderDaemon(
const process::http::URL& url,
http://git-wip-us.apache.org/repos/asf/mesos/blob/138e5c03/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 6e9adc6..4914194 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1238,6 +1238,9 @@ void Slave::registered(
CHECK_SOME(state::checkpoint(path, info_));
}
+ // Start the local resource providers daemon once we have the slave id.
+ localResourceProviderDaemon->start(info.id());
+
// Setup a timer so that the agent attempts to re-register if it
// doesn't receive a ping from the master for an extended period
// of time. This needs to be done once registered, in case we
@@ -6280,6 +6283,9 @@ Future<Nothing> Slave::recover(const Try<state::State>& state)
} else {
info = slaveState->info.get(); // Recover the slave info.
+ // Start the local resource providers daemon once we have the slave id.
+ localResourceProviderDaemon->start(info.id());
+
// Recover the frameworks.
foreachvalue (const FrameworkState& frameworkState,
slaveState->frameworks) {