You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2017/12/02 01:32:10 UTC

[3/5] mesos git commit: Initialized and subscribed storage local resource provider.

Initialized and subscribed storage local resource provider.

This patch validates `ResourceProviderInfo` for storage local resource
providers upon creation. During initialization, the storage local
resource provider first tries to recover its ID of the last session
through reading the actual path linked by
\`<work_dir>/meta/resource_providers/<type>/<name>/latest\`,
then subscribe to the agent's resource provider manager.

This patch is split from https://reviews.apache.org/r/63021/.

Review: https://reviews.apache.org/r/63823/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/619f4ae1
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/619f4ae1
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/619f4ae1

Branch: refs/heads/master
Commit: 619f4ae1a67d7110097f7b733741314c211e72c0
Parents: 5044610
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Fri Dec 1 15:11:17 2017 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Fri Dec 1 17:32:03 2017 -0800

----------------------------------------------------------------------
 src/resource_provider/storage/provider.cpp | 235 +++++++++++++++++++++++-
 1 file changed, 229 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/619f4ae1/src/resource_provider/storage/provider.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index f586afc..eee857b 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -21,6 +21,7 @@
 #include <glog/logging.h>
 
 #include <process/defer.hpp>
+#include <process/delay.hpp>
 #include <process/id.hpp>
 #include <process/process.hpp>
 
@@ -28,27 +29,37 @@
 
 #include <mesos/v1/resource_provider.hpp>
 
+#include <stout/path.hpp>
+
+#include <stout/os/realpath.hpp>
+
+#include "common/http.hpp"
+
 #include "internal/devolve.hpp"
+#include "internal/evolve.hpp"
 
 #include "resource_provider/detector.hpp"
 
+#include "slave/paths.hpp"
+
 namespace http = process::http;
 
 using std::queue;
 using std::string;
 
+using process::Failure;
+using process::Future;
 using process::Owned;
 using process::Process;
 
 using process::defer;
 using process::spawn;
-using process::terminate;
-using process::wait;
 
 using process::http::authentication::Principal;
 
 using mesos::ResourceProviderInfo;
 
+using mesos::resource_provider::Call;
 using mesos::resource_provider::Event;
 
 using mesos::v1::resource_provider::Driver;
@@ -73,6 +84,23 @@ static bool isValidName(const string& s)
 }
 
 
+// Returns true if the string is a valid Java package name.
+static bool isValidType(const string& s)
+{
+  if (s.empty()) {
+    return false;
+  }
+
+  foreach (const string& token, strings::split(s, ".")) {
+    if (!isValidName(token)) {
+      return false;
+    }
+  }
+
+  return true;
+}
+
+
 // Returns a prefix for naming standalone containers to run CSI plugins
 // for the resource provider. The prefix is of the following format:
 //     <rp_type>-<rp_name>--
@@ -100,8 +128,10 @@ public:
       const SlaveID& _slaveId,
       const Option<string>& _authToken)
     : ProcessBase(process::ID::generate("storage-local-resource-provider")),
+      state(RECOVERING),
       url(_url),
       workDir(_workDir),
+      metaDir(slave::paths::getMetaRootDir(_workDir)),
       contentType(ContentType::PROTOBUF),
       info(_info),
       slaveId(_slaveId),
@@ -119,42 +149,75 @@ public:
 
 private:
   void initialize() override;
+  void fatal(const string& messsage, const string& failure);
+
+  Future<Nothing> recover();
+  void doReliableRegistration();
+
+  // Functions for received events.
+  void subscribed(const Event::Subscribed& subscribed);
+  void operation(const Event::Operation& operation);
+  void publish(const Event::Publish& publish);
+
+  enum State
+  {
+    RECOVERING,
+    DISCONNECTED,
+    CONNECTED,
+    SUBSCRIBED,
+    READY
+  } state;
 
   const http::URL url;
   const string workDir;
+  const string metaDir;
   const ContentType contentType;
   ResourceProviderInfo info;
   const SlaveID slaveId;
+  const Option<string> authToken;
+
   Owned<v1::resource_provider::Driver> driver;
-  Option<string> authToken;
 };
 
 
 void StorageLocalResourceProviderProcess::connected()
 {
+  CHECK_EQ(DISCONNECTED, state);
+
+  state = CONNECTED;
+
+  doReliableRegistration();
 }
 
 
 void StorageLocalResourceProviderProcess::disconnected()
 {
+  CHECK(state == CONNECTED || state == SUBSCRIBED || state == READY);
+
+  LOG(INFO) << "Disconnected from resource provider manager";
+
+  state = DISCONNECTED;
 }
 
 
 void StorageLocalResourceProviderProcess::received(const Event& event)
 {
-  // TODO(jieyu): Print resource provider ID.
   LOG(INFO) << "Received " << event.type() << " event";
 
   switch (event.type()) {
     case Event::SUBSCRIBED: {
+      CHECK(event.has_subscribed());
+      subscribed(event.subscribed());
       break;
     }
     case Event::OPERATION: {
       CHECK(event.has_operation());
+      operation(event.operation());
       break;
     }
     case Event::PUBLISH: {
       CHECK(event.has_publish());
+      publish(event.publish());
       break;
     }
     case Event::UNKNOWN: {
@@ -167,6 +230,53 @@ void StorageLocalResourceProviderProcess::received(const Event& event)
 
 void StorageLocalResourceProviderProcess::initialize()
 {
+  const string message =
+    "Failed to recover resource provider with type '" + info.type() +
+    "' and name '" + info.name() + "'";
+
+  recover()
+    .onFailed(defer(self(), &Self::fatal, message, lambda::_1))
+    .onDiscarded(defer(self(), &Self::fatal, message, "future discarded"));
+}
+
+
+void StorageLocalResourceProviderProcess::fatal(
+    const string& message,
+    const string& failure)
+{
+  LOG(ERROR) << message << ": " << failure;
+
+  // Force the disconnection early.
+  driver.reset();
+
+  process::terminate(self());
+}
+
+
+Future<Nothing> StorageLocalResourceProviderProcess::recover()
+{
+  CHECK_EQ(RECOVERING, state);
+
+  // Recover the resource provider ID from the latest symlink. If the
+  // symlink does not exist or it points to a non-exist directory,
+  // treat this as a new resource provider.
+  // TODO(chhsiao): State recovery.
+  Result<string> realpath = os::realpath(
+      slave::paths::getLatestResourceProviderPath(
+          metaDir, slaveId, info.type(), info.name()));
+
+  if (realpath.isError()) {
+    return Failure(
+        "Failed to read the latest symlink for resource provider with type '" +
+        info.type() + "' and name '" + info.name() + "': " + realpath.error());
+  }
+
+  if (realpath.isSome()) {
+    info.mutable_id()->set_value(Path(realpath.get()).basename());
+  }
+
+  state = DISCONNECTED;
+
   driver.reset(new Driver(
       Owned<EndpointDetector>(new ConstantEndpointDetector(url)),
       contentType,
@@ -180,6 +290,78 @@ void StorageLocalResourceProviderProcess::initialize()
         }
       }),
       None())); // TODO(nfnt): Add authentication as part of MESOS-7854.
+
+  return Nothing();
+}
+
+
+void StorageLocalResourceProviderProcess::doReliableRegistration()
+{
+  if (state == DISCONNECTED || state == SUBSCRIBED || state == READY) {
+    return;
+  }
+
+  CHECK_EQ(CONNECTED, state);
+
+  const string message =
+    "Failed to subscribe resource provider with type '" + info.type() +
+    "' and name '" + info.name() + "'";
+
+  Call call;
+  call.set_type(Call::SUBSCRIBE);
+
+  Call::Subscribe* subscribe = call.mutable_subscribe();
+  subscribe->mutable_resource_provider_info()->CopyFrom(info);
+
+  driver->send(evolve(call));
+
+  // TODO(chhsiao): Consider doing an exponential backoff.
+  delay(Seconds(1), self(), &Self::doReliableRegistration);
+}
+
+
+void StorageLocalResourceProviderProcess::subscribed(
+    const Event::Subscribed& subscribed)
+{
+  CHECK_EQ(CONNECTED, state);
+
+  LOG(INFO) << "Subscribed with ID " << subscribed.provider_id().value();
+
+  state = SUBSCRIBED;
+
+  if (!info.has_id()) {
+    // New subscription.
+    info.mutable_id()->CopyFrom(subscribed.provider_id());
+    slave::paths::createResourceProviderDirectory(
+        metaDir,
+        slaveId,
+        info.type(),
+        info.name(),
+        info.id());
+  }
+}
+
+
+void StorageLocalResourceProviderProcess::operation(
+    const Event::Operation& operation)
+{
+  if (state == SUBSCRIBED) {
+    // TODO(chhsiao): Reject this operation.
+    return;
+  }
+
+  CHECK_EQ(READY, state);
+}
+
+
+void StorageLocalResourceProviderProcess::publish(const Event::Publish& publish)
+{
+  if (state == SUBSCRIBED) {
+    // TODO(chhsiao): Reject this publish request.
+    return;
+  }
+
+  CHECK_EQ(READY, state);
 }
 
 
@@ -199,6 +381,47 @@ Try<Owned<LocalResourceProvider>> StorageLocalResourceProvider::create(
         "' does not follow Java package naming convention");
   }
 
+  if (!info.has_storage()) {
+    return Error("'ResourceProviderInfo.storage' must be set");
+  }
+
+  // Verify that the type and name of the CSI plugin follow Java package
+  // naming convention.
+  // TODO(chhsiao): We should move this check to a validation function
+  // for `CSIPluginInfo`.
+  if (!isValidType(info.storage().type()) ||
+      !isValidName(info.storage().name())) {
+    return Error(
+        "CSI plugin type '" + info.storage().type() +
+        "' and name '" + info.storage().name() +
+        "' does not follow Java package naming convention");
+  }
+
+  bool hasControllerService = false;
+  bool hasNodeService = false;
+
+  foreach (const CSIPluginContainerInfo& container,
+           info.storage().containers()) {
+    for (int i = 0; i < container.services_size(); i++) {
+      const CSIPluginContainerInfo::Service service = container.services(i);
+      if (service == CSIPluginContainerInfo::CONTROLLER_SERVICE) {
+        hasControllerService = true;
+      } else if (service == CSIPluginContainerInfo::NODE_SERVICE) {
+        hasNodeService = true;
+      }
+    }
+  }
+
+  if (!hasControllerService) {
+    return Error(
+        stringify(CSIPluginContainerInfo::CONTROLLER_SERVICE) + " not found");
+  }
+
+  if (!hasNodeService) {
+    return Error(
+        stringify(CSIPluginContainerInfo::NODE_SERVICE) + " not found");
+  }
+
   return Owned<LocalResourceProvider>(
       new StorageLocalResourceProvider(url, workDir, info, slaveId, authToken));
 }
@@ -228,8 +451,8 @@ StorageLocalResourceProvider::StorageLocalResourceProvider(
 
 StorageLocalResourceProvider::~StorageLocalResourceProvider()
 {
-  terminate(process.get());
-  wait(process.get());
+  process::terminate(process.get());
+  process::wait(process.get());
 }
 
 } // namespace internal {