You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2017/11/22 00:40:16 UTC

[2/3] mesos git commit: Started `LocalResourceProviderDaemon` after obtaining the slave ID.

Started `LocalResourceProviderDaemon` after obtaining the slave ID.

The local resource provider daemon is now started aftor we get a slave
id (either through registration or recovery). The slave id will be
eventually passed into each local resource provider for checkpointing
their metadata.

Review: https://reviews.apache.org/r/63376/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/138e5c03
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/138e5c03
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/138e5c03

Branch: refs/heads/master
Commit: 138e5c030a54b8ccd49b78108646267479ff9812
Parents: f8c40a7
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Tue Nov 21 15:53:11 2017 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Tue Nov 21 16:08:23 2017 -0800

----------------------------------------------------------------------
 src/resource_provider/daemon.cpp | 112 ++++++++++++++++++++++++----------
 src/resource_provider/daemon.hpp |   4 ++
 src/slave/slave.cpp              |   6 ++
 3 files changed, 91 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/138e5c03/src/resource_provider/daemon.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/daemon.cpp b/src/resource_provider/daemon.cpp
index d584eb9..4026c14 100644
--- a/src/resource_provider/daemon.cpp
+++ b/src/resource_provider/daemon.cpp
@@ -17,14 +17,15 @@
 #include "resource_provider/daemon.hpp"
 
 #include <utility>
-#include <vector>
 
 #include <glog/logging.h>
 
+#include <process/dispatch.hpp>
 #include <process/id.hpp>
 #include <process/process.hpp>
 
 #include <stout/foreach.hpp>
+#include <stout/hashmap.hpp>
 #include <stout/json.hpp>
 #include <stout/nothing.hpp>
 #include <stout/option.hpp>
@@ -35,14 +36,18 @@
 
 #include "resource_provider/local.hpp"
 
+namespace http = process::http;
+
 using std::list;
 using std::string;
-using std::vector;
 
+using process::Failure;
+using process::Future;
 using process::Owned;
 using process::Process;
 using process::ProcessBase;
 
+using process::dispatch;
 using process::spawn;
 using process::terminate;
 using process::wait;
@@ -55,7 +60,7 @@ class LocalResourceProviderDaemonProcess
 {
 public:
   LocalResourceProviderDaemonProcess(
-      const process::http::URL& _url,
+      const http::URL& _url,
       const string& _workDir,
       const Option<string>& _configDir)
     : ProcessBase(process::ID::generate("local-resource-provider-daemon")),
@@ -63,31 +68,61 @@ public:
       workDir(_workDir),
       configDir(_configDir) {}
 
+  LocalResourceProviderDaemonProcess(
+      const LocalResourceProviderDaemonProcess& other) = delete;
+
+  LocalResourceProviderDaemonProcess& operator=(
+      const LocalResourceProviderDaemonProcess& other) = delete;
+
+  void start(const SlaveID& _slaveId);
+
 protected:
   void initialize() override;
 
 private:
-  struct Provider
+  struct ProviderData
   {
-    Provider(const ResourceProviderInfo& _info,
-             Owned<LocalResourceProvider> _provider)
-      : info(_info),
-        provider(std::move(_provider)) {}
+    ProviderData(const ResourceProviderInfo& _info)
+      : info(_info) {}
 
     const ResourceProviderInfo info;
-    const Owned<LocalResourceProvider> provider;
+    Owned<LocalResourceProvider> provider;
   };
 
   Try<Nothing> load(const string& path);
 
-  const process::http::URL url;
+  Future<Nothing> launch(const string& type, const string& name);
+
+  const http::URL url;
   const string workDir;
   const Option<string> configDir;
 
-  vector<Provider> providers;
+  Option<SlaveID> slaveId;
+  hashmap<string, hashmap<string, ProviderData>> providers;
 };
 
 
+void LocalResourceProviderDaemonProcess::start(const SlaveID& _slaveId)
+{
+  CHECK_NONE(slaveId) << "Local resource provider daemon is already started";
+
+  slaveId = _slaveId;
+
+  foreachkey (const string& type, providers) {
+    foreachkey (const string& name, providers[type]) {
+      auto error = [=](const string& message) {
+        LOG(ERROR) << "Failed to launch resource provider with type '" << type
+                   << "' and name '" << name << "': " << message;
+      };
+
+      launch(type, name)
+        .onFailed(error)
+        .onDiscarded(std::bind(error, "future discarded"));
+    }
+  }
+}
+
+
 void LocalResourceProviderDaemonProcess::initialize()
 {
   if (configDir.isNone()) {
@@ -96,8 +131,9 @@ void LocalResourceProviderDaemonProcess::initialize()
 
   Try<list<string>> entries = os::ls(configDir.get());
   if (entries.isError()) {
-    LOG(ERROR) << "Unable to list the resource provider directory '"
+    LOG(FATAL) << "Unable to list the resource provider config directory '"
                << configDir.get() << "': " << entries.error();
+    return;
   }
 
   foreach (const string& entry, entries.get()) {
@@ -111,7 +147,6 @@ void LocalResourceProviderDaemonProcess::initialize()
     if (loading.isError()) {
       LOG(ERROR) << "Failed to load resource provider config '"
                  << path << "': " << loading.error();
-      continue;
     }
   }
 }
@@ -137,32 +172,44 @@ Try<Nothing> LocalResourceProviderDaemonProcess::load(const string& path)
   }
 
   // Ensure that ('type', 'name') pair is unique.
-  foreach (const Provider& provider, providers) {
-    if (info->type() == provider.info.type() &&
-        info->name() == provider.info.name()) {
-      return Error(
-          "Multiple resource providers with type '" + info->type() +
-          "' and name '" + info->name() + "'");
-    }
+  if (providers[info->type()].contains(info->name())) {
+    return Error(
+        "Multiple resource providers with type '" + info->type() +
+        "' and name '" + info->name() + "'");
   }
 
+  providers[info->type()].put(info->name(), info.get());
+
+  return Nothing();
+}
+
+
+Future<Nothing> LocalResourceProviderDaemonProcess::launch(
+    const string& type,
+    const string& name)
+{
+  CHECK_SOME(slaveId);
+  CHECK(providers[type].contains(name));
+
+  ProviderData& data = providers[type].at(name);
+
   Try<Owned<LocalResourceProvider>> provider =
-    LocalResourceProvider::create(url, info.get());
+    LocalResourceProvider::create(url, data.info);
 
   if (provider.isError()) {
-    return Error(
-        "Failed to create resource provider with type '" + info->type() +
-        "' and name '" + info->name() + "'");
+    return Failure(
+        "Failed to create resource provider with type '" + type +
+        "' and name '" + name + "': " + provider.error());
   }
 
-  providers.emplace_back(info.get(), provider.get());
+  data.provider = provider.get();
 
   return Nothing();
 }
 
 
 Try<Owned<LocalResourceProviderDaemon>> LocalResourceProviderDaemon::create(
-    const process::http::URL& url,
+    const http::URL& url,
     const slave::Flags& flags)
 {
   // We require that the config directory exists to create a daemon.
@@ -171,15 +218,12 @@ Try<Owned<LocalResourceProviderDaemon>> LocalResourceProviderDaemon::create(
     return Error("Config directory '" + configDir.get() + "' does not exist");
   }
 
-  return new LocalResourceProviderDaemon(
-      url,
-      flags.work_dir,
-      configDir);
+  return new LocalResourceProviderDaemon(url, flags.work_dir, configDir);
 }
 
 
 LocalResourceProviderDaemon::LocalResourceProviderDaemon(
-    const process::http::URL& url,
+    const http::URL& url,
     const string& workDir,
     const Option<string>& configDir)
   : process(new LocalResourceProviderDaemonProcess(url, workDir, configDir))
@@ -194,5 +238,11 @@ LocalResourceProviderDaemon::~LocalResourceProviderDaemon()
   wait(process.get());
 }
 
+
+void LocalResourceProviderDaemon::start(const SlaveID& slaveId)
+{
+  dispatch(process.get(), &LocalResourceProviderDaemonProcess::start, slaveId);
+}
+
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/138e5c03/src/resource_provider/daemon.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/daemon.hpp b/src/resource_provider/daemon.hpp
index ef6c356..8668cd1 100644
--- a/src/resource_provider/daemon.hpp
+++ b/src/resource_provider/daemon.hpp
@@ -19,6 +19,8 @@
 
 #include <string>
 
+#include <mesos/mesos.hpp>
+
 #include <process/http.hpp>
 #include <process/owned.hpp>
 
@@ -52,6 +54,8 @@ public:
   LocalResourceProviderDaemon& operator=(
       const LocalResourceProviderDaemon& other) = delete;
 
+  void start(const SlaveID& slaveId);
+
 private:
   LocalResourceProviderDaemon(
       const process::http::URL& url,

http://git-wip-us.apache.org/repos/asf/mesos/blob/138e5c03/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 6e9adc6..4914194 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1238,6 +1238,9 @@ void Slave::registered(
         CHECK_SOME(state::checkpoint(path, info_));
       }
 
+      // Start the local resource providers daemon once we have the slave id.
+      localResourceProviderDaemon->start(info.id());
+
       // Setup a timer so that the agent attempts to re-register if it
       // doesn't receive a ping from the master for an extended period
       // of time. This needs to be done once registered, in case we
@@ -6280,6 +6283,9 @@ Future<Nothing> Slave::recover(const Try<state::State>& state)
     } else {
       info = slaveState->info.get(); // Recover the slave info.
 
+      // Start the local resource providers daemon once we have the slave id.
+      localResourceProviderDaemon->start(info.id());
+
       // Recover the frameworks.
       foreachvalue (const FrameworkState& frameworkState,
                     slaveState->frameworks) {