You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2017/12/02 01:32:08 UTC

[1/5] mesos git commit: Fixed the broken CMake build.

Repository: mesos
Updated Branches:
  refs/heads/master 5044610ca -> fc6cbbfec


Fixed the broken CMake build.

Currently, gRPC support is not yet added to CMake build system. Thus, we
cannot build storage local resource provider yet in CMake which depends
on gRPC.


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/fc6cbbfe
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/fc6cbbfe
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/fc6cbbfe

Branch: refs/heads/master
Commit: fc6cbbfece5a3e9a45d55772e8da94f517c6d1f2
Parents: 089e789
Author: Jie Yu <yu...@gmail.com>
Authored: Fri Dec 1 17:18:52 2017 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Fri Dec 1 17:32:03 2017 -0800

----------------------------------------------------------------------
 src/CMakeLists.txt | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/fc6cbbfe/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 318cb40..592489d 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -478,8 +478,7 @@ set(RESOURCE_PROVIDER_SRC
   resource_provider/local.cpp
   resource_provider/manager.cpp
   resource_provider/registrar.cpp
-  resource_provider/validation.cpp
-  resource_provider/storage/provider.cpp)
+  resource_provider/validation.cpp)
 
 set(SCHEDULER_SRC
   sched/sched.cpp


[2/5] mesos git commit: Recover controller and node services and clean up unused containers.

Posted by ji...@apache.org.
Recover controller and node services and clean up unused containers.

The storage local resource provider now scans through the list of plugin
containers from

  `<work_dir>/csi/<csi_type>/<csi_name>/containers/<container_id>`,

and kill containers that will no longer in use, then starts up the
containers for the controller and node services.

Review: https://reviews.apache.org/r/64044/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3adab348
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3adab348
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3adab348

Branch: refs/heads/master
Commit: 3adab348e422b47121b7d0ef19d84701660be531
Parents: 434a094
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Fri Dec 1 15:11:58 2017 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Fri Dec 1 17:32:03 2017 -0800

----------------------------------------------------------------------
 src/resource_provider/storage/provider.cpp | 344 ++++++++++++++++++++++--
 1 file changed, 317 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/3adab348/src/resource_provider/storage/provider.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index bbf168a..a57ef15 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -22,6 +22,7 @@
 #include <glog/logging.h>
 
 #include <process/after.hpp>
+#include <process/collect.hpp>
 #include <process/defer.hpp>
 #include <process/delay.hpp>
 #include <process/id.hpp>
@@ -43,6 +44,7 @@
 #include <stout/os/mkdir.hpp>
 #include <stout/os/realpath.hpp>
 #include <stout/os/rm.hpp>
+#include <stout/os/rmdir.hpp>
 
 #include "common/http.hpp"
 
@@ -57,10 +59,12 @@
 
 #include "slave/container_daemon.hpp"
 #include "slave/paths.hpp"
+#include "slave/state.hpp"
 
 namespace http = process::http;
 
 using std::find;
+using std::list;
 using std::queue;
 using std::string;
 
@@ -75,6 +79,7 @@ using process::Promise;
 using process::Timeout;
 
 using process::after;
+using process::collect;
 using process::defer;
 using process::loop;
 using process::spawn;
@@ -203,6 +208,20 @@ static inline http::URL extractParentEndpoint(const http::URL& url)
 }
 
 
+// Returns the 'Bearer' credential as a header for calling the V1 agent
+// API if the `authToken` is presented, or empty otherwise.
+static inline http::Headers getAuthHeader(const Option<string>& authToken)
+{
+  http::Headers headers;
+
+  if (authToken.isSome()) {
+    headers["Authorization"] = "Bearer " + authToken.get();
+  }
+
+  return headers;
+}
+
+
 class StorageLocalResourceProviderProcess
   : public Process<StorageLocalResourceProviderProcess>
 {
@@ -238,6 +257,7 @@ private:
   void fatal(const string& messsage, const string& failure);
 
   Future<Nothing> recover();
+  Future<Nothing> recoverServices();
   void doReliableRegistration();
 
   // Functions for received events.
@@ -247,6 +267,10 @@ private:
 
   Future<csi::Client> connect(const string& endpoint);
   Future<csi::Client> getService(const ContainerID& containerId);
+  Future<Nothing> killService(const ContainerID& containerId);
+
+  Future<Nothing> prepareControllerService();
+  Future<Nothing> prepareNodeService();
 
   enum State
   {
@@ -273,6 +297,11 @@ private:
   ContainerID nodeContainerId;
   hashmap<ContainerID, Owned<ContainerDaemon>> daemons;
   hashmap<ContainerID, Owned<Promise<csi::Client>>> services;
+
+  Option<csi::GetPluginInfoResponse> controllerInfo;
+  Option<csi::GetPluginInfoResponse> nodeInfo;
+  Option<csi::ControllerCapabilities> controllerCapabilities;
+  Option<string> nodeId;
 };
 
 
@@ -382,41 +411,125 @@ Future<Nothing> StorageLocalResourceProviderProcess::recover()
 {
   CHECK_EQ(RECOVERING, state);
 
-  // Recover the resource provider ID from the latest symlink. If the
-  // symlink does not exist or it points to a non-exist directory,
-  // treat this as a new resource provider.
-  // TODO(chhsiao): State recovery.
-  Result<string> realpath = os::realpath(
-      slave::paths::getLatestResourceProviderPath(
-          metaDir, slaveId, info.type(), info.name()));
+  return recoverServices()
+    .then(defer(self(), [=]() -> Future<Nothing> {
+      // Recover the resource provider ID from the latest symlink. If
+      // the symlink does not exist or it points to a non-exist
+      // directory, treat this as a new resource provider.
+      // TODO(chhsiao): State recovery.
+      Result<string> realpath = os::realpath(
+          slave::paths::getLatestResourceProviderPath(
+              metaDir, slaveId, info.type(), info.name()));
+
+      if (realpath.isError()) {
+        return Failure(
+            "Failed to read the latest symlink for resource provider with "
+            "type '" + info.type() + "' and name '" + info.name() + "'"
+            ": " + realpath.error());
+      }
+
+      if (realpath.isSome()) {
+        info.mutable_id()->set_value(Path(realpath.get()).basename());
+      }
+
+      state = DISCONNECTED;
+
+      driver.reset(new Driver(
+          Owned<EndpointDetector>(new ConstantEndpointDetector(url)),
+          contentType,
+          defer(self(), &Self::connected),
+          defer(self(), &Self::disconnected),
+          defer(self(), [this](queue<v1::resource_provider::Event> events) {
+            while(!events.empty()) {
+              const v1::resource_provider::Event& event = events.front();
+              received(devolve(event));
+              events.pop();
+            }
+          }),
+          None())); // TODO(nfnt): Add authentication as part of MESOS-7854.
+
+      return Nothing();
+    }));
+}
+
+
+Future<Nothing> StorageLocalResourceProviderProcess::recoverServices()
+{
+  Try<list<string>> containerPaths = csi::paths::getContainerPaths(
+      slave::paths::getCsiRootDir(workDir),
+      info.storage().type(),
+      info.storage().name());
 
-  if (realpath.isError()) {
+  if (containerPaths.isError()) {
     return Failure(
-        "Failed to read the latest symlink for resource provider with type '" +
-        info.type() + "' and name '" + info.name() + "': " + realpath.error());
+        "Failed to find plugin containers for CSI plugin type '" +
+        info.storage().type() + "' and name '" + info.storage().name() + ": " +
+        containerPaths.error());
   }
 
-  if (realpath.isSome()) {
-    info.mutable_id()->set_value(Path(realpath.get()).basename());
-  }
+  list<Future<Nothing>> futures;
+
+  foreach (const string& path, containerPaths.get()) {
+    ContainerID containerId;
+    containerId.set_value(Path(path).basename());
+
+    // Do not kill the up-to-date controller or node container.
+    // Otherwise, kill them and perform cleanups.
+    if (containerId == controllerContainerId ||
+        containerId == nodeContainerId) {
+      const string configPath = csi::paths::getContainerInfoPath(
+          slave::paths::getCsiRootDir(workDir),
+          info.storage().type(),
+          info.storage().name(),
+          containerId);
+
+      Result<CSIPluginContainerInfo> config =
+        ::protobuf::read<CSIPluginContainerInfo>(configPath);
+
+      if (config.isError()) {
+        return Failure(
+            "Failed to read plugin container config from '" +
+            configPath + "': " + config.error());
+      }
 
-  state = DISCONNECTED;
+      if (config.isSome() &&
+          getCSIPluginContainerInfo(info, containerId) == config.get()) {
+        continue;
+      }
+    }
+
+    futures.push_back(killService(containerId)
+      .then(defer(self(), [=]() -> Future<Nothing> {
+        Result<string> endpointDir =
+          os::realpath(csi::paths::getEndpointDirSymlinkPath(
+              slave::paths::getCsiRootDir(workDir),
+              info.storage().type(),
+              info.storage().name(),
+              containerId));
+
+        if (endpointDir.isSome()) {
+          Try<Nothing> rmdir = os::rmdir(endpointDir.get());
+          if (rmdir.isError()) {
+            return Failure(
+                "Failed to remove endpoint directory '" + endpointDir.get() +
+                "': " + rmdir.error());
+          }
+        }
 
-  driver.reset(new Driver(
-      Owned<EndpointDetector>(new ConstantEndpointDetector(url)),
-      contentType,
-      defer(self(), &Self::connected),
-      defer(self(), &Self::disconnected),
-      defer(self(), [this](queue<v1::resource_provider::Event> events) {
-        while(!events.empty()) {
-          const v1::resource_provider::Event& event = events.front();
-          received(devolve(event));
-          events.pop();
+        Try<Nothing> rmdir = os::rmdir(path);
+        if (rmdir.isError()) {
+          return Failure(
+              "Failed to remove plugin container directory '" + path + "': " +
+              rmdir.error());
         }
-      }),
-      None())); // TODO(nfnt): Add authentication as part of MESOS-7854.
 
-  return Nothing();
+        return Nothing();
+      })));
+  }
+
+  return collect(futures)
+    .then(defer(self(), &Self::prepareNodeService))
+    .then(defer(self(), &Self::prepareControllerService));
 }
 
 
@@ -668,6 +781,20 @@ Future<csi::Client> StorageLocalResourceProviderProcess::getService(
         stringify(containerId) + "': " + daemon.error());
   }
 
+  // Checkpoint the plugin container config.
+  const string configPath = csi::paths::getContainerInfoPath(
+      slave::paths::getCsiRootDir(workDir),
+      info.storage().type(),
+      info.storage().name(),
+      containerId);
+
+  Try<Nothing> checkpoint = slave::state::checkpoint(configPath, config.get());
+  if (checkpoint.isError()) {
+    return Failure(
+        "Failed to checkpoint plugin container config to '" + configPath +
+        "': " + checkpoint.error());
+  }
+
   const string message =
     "Container daemon for '" + stringify(containerId) + "' failed";
 
@@ -680,6 +807,169 @@ Future<csi::Client> StorageLocalResourceProviderProcess::getService(
 }
 
 
+// Kills the specified plugin container and returns a future that waits
+// for it to terminate.
+Future<Nothing> StorageLocalResourceProviderProcess::killService(
+    const ContainerID& containerId)
+{
+  CHECK(!daemons.contains(containerId));
+  CHECK(!services.contains(containerId));
+
+  agent::Call call;
+  call.set_type(agent::Call::KILL_CONTAINER);
+  call.mutable_kill_container()->mutable_container_id()->CopyFrom(containerId);
+
+  return http::post(
+      extractParentEndpoint(url),
+      getAuthHeader(authToken),
+      serialize(contentType, evolve(call)),
+      stringify(contentType))
+    .then(defer(self(), [=](const http::Response& response) -> Future<Nothing> {
+      if (response.status == http::NotFound().status) {
+        return Nothing();
+      }
+
+      if (response.status != http::OK().status) {
+        return Failure(
+            "Failed to kill container '" + stringify(containerId) +
+            "': Unexpected response '" + response.status + "' (" + response.body
+            + ")");
+      }
+
+      agent::Call call;
+      call.set_type(agent::Call::WAIT_CONTAINER);
+      call.mutable_wait_container()
+        ->mutable_container_id()->CopyFrom(containerId);
+
+      return http::post(
+          extractParentEndpoint(url),
+          getAuthHeader(authToken),
+          serialize(contentType, evolve(call)),
+          stringify(contentType))
+        .then(defer(self(), [=](
+            const http::Response& response) -> Future<Nothing> {
+          if (response.status != http::OK().status &&
+              response.status != http::NotFound().status) {
+            return Failure(
+                "Failed to wait for container '" + stringify(containerId) +
+                "': Unexpected response '" + response.status + "' (" +
+                response.body + ")");
+          }
+
+          return Nothing();
+        }));
+    }));
+}
+
+
+Future<Nothing> StorageLocalResourceProviderProcess::prepareControllerService()
+{
+  return getService(controllerContainerId)
+    .then(defer(self(), [=](csi::Client client) {
+      // Get the plugin info and check for consistency.
+      csi::GetPluginInfoRequest request;
+      request.mutable_version()->CopyFrom(csiVersion);
+
+      return client.GetPluginInfo(request)
+        .then(defer(self(), [=](const csi::GetPluginInfoResponse& response) {
+          controllerInfo = response;
+
+          LOG(INFO)
+            << "Controller plugin loaded: " << stringify(controllerInfo.get());
+
+          if (nodeInfo.isSome() &&
+              (controllerInfo->name() != nodeInfo->name() ||
+               controllerInfo->vendor_version() !=
+                 nodeInfo->vendor_version())) {
+            LOG(WARNING)
+              << "Inconsistent controller and node plugin components. Please "
+                 "check with the plugin vendor to ensure compatibility.";
+          }
+
+          // NOTE: We always get the latest service future before
+          // proceeding to the next step.
+          return getService(controllerContainerId);
+        }));
+    }))
+    .then(defer(self(), [=](csi::Client client) {
+      // Probe the plugin to validate the runtime environment.
+      csi::ControllerProbeRequest request;
+      request.mutable_version()->CopyFrom(csiVersion);
+
+      return client.ControllerProbe(request)
+        .then(defer(self(), [=](const csi::ControllerProbeResponse& response) {
+          return getService(controllerContainerId);
+        }));
+    }))
+    .then(defer(self(), [=](csi::Client client) {
+      // Get the controller capabilities.
+      csi::ControllerGetCapabilitiesRequest request;
+      request.mutable_version()->CopyFrom(csiVersion);
+
+      return client.ControllerGetCapabilities(request)
+        .then(defer(self(), [=](
+            const csi::ControllerGetCapabilitiesResponse& response) {
+          controllerCapabilities = response.capabilities();
+
+          return Nothing();
+        }));
+    }));
+}
+
+
+Future<Nothing> StorageLocalResourceProviderProcess::prepareNodeService()
+{
+  return getService(nodeContainerId)
+    .then(defer(self(), [=](csi::Client client) {
+      // Get the plugin info and check for consistency.
+      csi::GetPluginInfoRequest request;
+      request.mutable_version()->CopyFrom(csiVersion);
+
+      return client.GetPluginInfo(request)
+        .then(defer(self(), [=](const csi::GetPluginInfoResponse& response) {
+          nodeInfo = response;
+
+          LOG(INFO) << "Node plugin loaded: " << stringify(nodeInfo.get());
+
+          if (controllerInfo.isSome() &&
+              (controllerInfo->name() != nodeInfo->name() ||
+               controllerInfo->vendor_version() !=
+                 nodeInfo->vendor_version())) {
+            LOG(WARNING)
+              << "Inconsistent controller and node plugin components. Please "
+                 "check with the plugin vendor to ensure compatibility.";
+          }
+
+          // NOTE: We always get the latest service future before
+          // proceeding to the next step.
+          return getService(nodeContainerId);
+        }));
+    }))
+    .then(defer(self(), [=](csi::Client client) {
+      // Probe the plugin to validate the runtime environment.
+      csi::NodeProbeRequest request;
+      request.mutable_version()->CopyFrom(csiVersion);
+
+      return client.NodeProbe(request)
+        .then(defer(self(), [=](const csi::NodeProbeResponse& response) {
+          return getService(nodeContainerId);
+        }));
+    }))
+    .then(defer(self(), [=](csi::Client client) {
+      // Get the node ID.
+      csi::GetNodeIDRequest request;
+      request.mutable_version()->CopyFrom(csiVersion);
+
+      return client.GetNodeID(request)
+        .then(defer(self(), [=](const csi::GetNodeIDResponse& response) {
+          nodeId = response.node_id();
+
+          return Nothing();
+        }));
+    }));
+}
+
+
 Try<Owned<LocalResourceProvider>> StorageLocalResourceProvider::create(
     const http::URL& url,
     const string& workDir,


[5/5] mesos git commit: Added `getService()` function to launch CSI plugins.

Posted by ji...@apache.org.
Added `getService()` function to launch CSI plugins.

The `getService()` method first checks if there is already a container
daemon for the specified plugin component, and creates a new one if not.
The post-start hook for the container daemon will call `connect()` to
wait for the endpoint socket file to appear and connect to it, then
set up the corresponding promise of CSI client. The post-stop hook will
remove the socket file and create a new promise for the next call to the
post-start hook to set it up.

Review: https://reviews.apache.org/r/63021/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/434a0941
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/434a0941
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/434a0941

Branch: refs/heads/master
Commit: 434a09410b8ecf4c163ddba441db40cc1a2f711d
Parents: 619f4ae
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Fri Dec 1 15:11:39 2017 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Fri Dec 1 17:32:03 2017 -0800

----------------------------------------------------------------------
 include/mesos/type_utils.hpp               |   5 +
 include/mesos/v1/mesos.hpp                 |   5 +
 src/common/type_utils.cpp                  |   8 +
 src/resource_provider/storage/provider.cpp | 317 +++++++++++++++++++++++-
 src/v1/mesos.cpp                           |   8 +
 5 files changed, 342 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/434a0941/include/mesos/type_utils.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/type_utils.hpp b/include/mesos/type_utils.hpp
index a348c7d..d28d538 100644
--- a/include/mesos/type_utils.hpp
+++ b/include/mesos/type_utils.hpp
@@ -429,6 +429,11 @@ std::ostream& operator<<(std::ostream& stream, const CheckInfo::Type& type);
 
 std::ostream& operator<<(
     std::ostream& stream,
+    const CSIPluginContainerInfo::Service& service);
+
+
+std::ostream& operator<<(
+    std::ostream& stream,
     const FrameworkInfo::Capability& capability);
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/434a0941/include/mesos/v1/mesos.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/v1/mesos.hpp b/include/mesos/v1/mesos.hpp
index d1401fb..de61c36 100644
--- a/include/mesos/v1/mesos.hpp
+++ b/include/mesos/v1/mesos.hpp
@@ -423,6 +423,11 @@ std::ostream& operator<<(std::ostream& stream, const CheckInfo::Type& type);
 
 std::ostream& operator<<(
     std::ostream& stream,
+    const CSIPluginContainerInfo::Service& service);
+
+
+std::ostream& operator<<(
+    std::ostream& stream,
     const FrameworkInfo::Capability& capability);
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/434a0941/src/common/type_utils.cpp
----------------------------------------------------------------------
diff --git a/src/common/type_utils.cpp b/src/common/type_utils.cpp
index 3657d55..a272221 100644
--- a/src/common/type_utils.cpp
+++ b/src/common/type_utils.cpp
@@ -743,6 +743,14 @@ ostream& operator<<(ostream& stream, const CheckInfo::Type& type)
 
 ostream& operator<<(
     ostream& stream,
+    const CSIPluginContainerInfo::Service& service)
+{
+  return stream << CSIPluginContainerInfo::Service_Name(service);
+}
+
+
+ostream& operator<<(
+    ostream& stream,
     const FrameworkInfo::Capability& capability)
 {
   return stream << FrameworkInfo::Capability::Type_Name(capability.type());

http://git-wip-us.apache.org/repos/asf/mesos/blob/434a0941/src/resource_provider/storage/provider.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index eee857b..bbf168a 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -16,48 +16,72 @@
 
 #include "resource_provider/storage/provider.hpp"
 
+#include <algorithm>
 #include <cctype>
 
 #include <glog/logging.h>
 
+#include <process/after.hpp>
 #include <process/defer.hpp>
 #include <process/delay.hpp>
 #include <process/id.hpp>
+#include <process/loop.hpp>
 #include <process/process.hpp>
+#include <process/timeout.hpp>
+
+#include <mesos/type_utils.hpp>
 
 #include <mesos/resource_provider/resource_provider.hpp>
 
 #include <mesos/v1/resource_provider.hpp>
 
+#include <stout/foreach.hpp>
+#include <stout/hashmap.hpp>
 #include <stout/path.hpp>
 
+#include <stout/os/exists.hpp>
+#include <stout/os/mkdir.hpp>
 #include <stout/os/realpath.hpp>
+#include <stout/os/rm.hpp>
 
 #include "common/http.hpp"
 
+#include "csi/client.hpp"
+#include "csi/paths.hpp"
+#include "csi/utils.hpp"
+
 #include "internal/devolve.hpp"
 #include "internal/evolve.hpp"
 
 #include "resource_provider/detector.hpp"
 
+#include "slave/container_daemon.hpp"
 #include "slave/paths.hpp"
 
 namespace http = process::http;
 
+using std::find;
 using std::queue;
 using std::string;
 
+using process::Break;
+using process::Continue;
+using process::ControlFlow;
 using process::Failure;
 using process::Future;
 using process::Owned;
 using process::Process;
+using process::Promise;
+using process::Timeout;
 
+using process::after;
 using process::defer;
+using process::loop;
 using process::spawn;
 
 using process::http::authentication::Principal;
 
-using mesos::ResourceProviderInfo;
+using mesos::internal::slave::ContainerDaemon;
 
 using mesos::resource_provider::Call;
 using mesos::resource_provider::Event;
@@ -101,6 +125,10 @@ static bool isValidType(const string& s)
 }
 
 
+// Timeout for a CSI plugin component to create its endpoint socket.
+static const Duration CSI_ENDPOINT_CREATION_TIMEOUT = Seconds(5);
+
+
 // Returns a prefix for naming standalone containers to run CSI plugins
 // for the resource provider. The prefix is of the following format:
 //     <rp_type>-<rp_name>--
@@ -117,6 +145,64 @@ static inline string getContainerIdPrefix(const ResourceProviderInfo& info)
 }
 
 
+// Returns the container ID of the standalone container to run a CSI
+// plugin component. The container ID is of the following format:
+//     <rp_type>-<rp_name>--<csi_type>-<csi_name>--<list_of_services>
+// where <rp_type> and <rp_name> are the type and name of the resource
+// provider, and <csi_type> and <csi_name> are the type and name of the
+// CSI plugin, with dots replaced by dashes. <list_of_services> lists
+// the CSI services provided by the component, concatenated with dashes.
+static inline ContainerID getContainerId(
+    const ResourceProviderInfo& info,
+    const CSIPluginContainerInfo& container)
+{
+  string value = getContainerIdPrefix(info);
+
+  value += strings::join(
+      "-",
+      strings::replace(info.storage().type(), ".", "-"),
+      info.storage().name(),
+      "");
+
+  for (int i = 0; i < container.services_size(); i++) {
+    value += "-" + stringify(container.services(i));
+  }
+
+  ContainerID containerId;
+  containerId.set_value(value);
+
+  return containerId;
+}
+
+
+static Option<CSIPluginContainerInfo> getCSIPluginContainerInfo(
+    const ResourceProviderInfo& info,
+    const ContainerID& containerId)
+{
+  foreach (const CSIPluginContainerInfo& container,
+           info.storage().containers()) {
+    if (getContainerId(info, container) == containerId) {
+      return container;
+    }
+  }
+
+  return None();
+}
+
+
+// Returns the parent endpoint as a URL.
+// TODO(jieyu): Consider using a more reliable way to get the agent v1
+// operator API endpoint URL.
+static inline http::URL extractParentEndpoint(const http::URL& url)
+{
+  http::URL parent = url;
+
+  parent.path = Path(url.path).dirname();
+
+  return parent;
+}
+
+
 class StorageLocalResourceProviderProcess
   : public Process<StorageLocalResourceProviderProcess>
 {
@@ -159,6 +245,9 @@ private:
   void operation(const Event::Operation& operation);
   void publish(const Event::Publish& publish);
 
+  Future<csi::Client> connect(const string& endpoint);
+  Future<csi::Client> getService(const ContainerID& containerId);
+
   enum State
   {
     RECOVERING,
@@ -176,7 +265,14 @@ private:
   const SlaveID slaveId;
   const Option<string> authToken;
 
+  csi::Version csiVersion;
+  process::grpc::client::Runtime runtime;
   Owned<v1::resource_provider::Driver> driver;
+
+  ContainerID controllerContainerId;
+  ContainerID nodeContainerId;
+  hashmap<ContainerID, Owned<ContainerDaemon>> daemons;
+  hashmap<ContainerID, Owned<Promise<csi::Client>>> services;
 };
 
 
@@ -230,6 +326,35 @@ void StorageLocalResourceProviderProcess::received(const Event& event)
 
 void StorageLocalResourceProviderProcess::initialize()
 {
+  // Set CSI version to 0.1.0.
+  csiVersion.set_major(0);
+  csiVersion.set_minor(1);
+  csiVersion.set_patch(0);
+
+  foreach (const CSIPluginContainerInfo& container,
+           info.storage().containers()) {
+    auto it = find(
+        container.services().begin(),
+        container.services().end(),
+        CSIPluginContainerInfo::CONTROLLER_SERVICE);
+    if (it != container.services().end()) {
+      controllerContainerId = getContainerId(info, container);
+      break;
+    }
+  }
+
+  foreach (const CSIPluginContainerInfo& container,
+           info.storage().containers()) {
+    auto it = find(
+        container.services().begin(),
+        container.services().end(),
+        CSIPluginContainerInfo::NODE_SERVICE);
+    if (it != container.services().end()) {
+      nodeContainerId = getContainerId(info, container);
+      break;
+    }
+  }
+
   const string message =
     "Failed to recover resource provider with type '" + info.type() +
     "' and name '" + info.name() + "'";
@@ -365,6 +490,196 @@ void StorageLocalResourceProviderProcess::publish(const Event::Publish& publish)
 }
 
 
+// Returns a future of a CSI client that waits for the endpoint socket
+// to appear if necessary, then connects to the socket and check its
+// supported version.
+Future<csi::Client> StorageLocalResourceProviderProcess::connect(
+    const string& endpoint)
+{
+  Future<csi::Client> client;
+
+  if (os::exists(endpoint)) {
+    client = csi::Client("unix://" + endpoint, runtime);
+  } else {
+    // Wait for the endpoint socket to appear until the timeout expires.
+    Timeout timeout = Timeout::in(CSI_ENDPOINT_CREATION_TIMEOUT);
+
+    client = loop(
+        self(),
+        [=]() -> Future<Nothing> {
+          if (timeout.expired()) {
+            return Failure("Timed out waiting for endpoint '" + endpoint + "'");
+          }
+
+          return after(Milliseconds(10));
+        },
+        [=](const Nothing&) -> ControlFlow<csi::Client> {
+          if (os::exists(endpoint)) {
+            return Break(csi::Client("unix://" + endpoint, runtime));
+          }
+
+          return Continue();
+        });
+  }
+
+  return client
+    .then(defer(self(), [=](csi::Client client) {
+      return client.GetSupportedVersions(csi::GetSupportedVersionsRequest())
+        .then(defer(self(), [=](
+            const csi::GetSupportedVersionsResponse& response)
+            -> Future<csi::Client> {
+          auto it = find(
+              response.supported_versions().begin(),
+              response.supported_versions().end(),
+              csiVersion);
+          if (it == response.supported_versions().end()) {
+            return Failure(
+                "CSI version " + stringify(csiVersion) + " is not supported");
+          }
+
+          return client;
+        }));
+    }));
+}
+
+
+// Returns a future of the latest CSI client for the specified plugin
+// container. If the container is not already running, this method will
+// start a new a new container daemon.
+Future<csi::Client> StorageLocalResourceProviderProcess::getService(
+    const ContainerID& containerId)
+{
+  if (daemons.contains(containerId)) {
+    CHECK(services.contains(containerId));
+    return services.at(containerId)->future();
+  }
+
+  Option<CSIPluginContainerInfo> config =
+    getCSIPluginContainerInfo(info, containerId);
+
+  CHECK_SOME(config);
+
+  CommandInfo commandInfo;
+
+  if (config->has_command()) {
+    commandInfo.CopyFrom(config->command());
+  }
+
+  // Set the `CSI_ENDPOINT` environment variable.
+  Try<string> endpoint = csi::paths::getEndpointSocketPath(
+      slave::paths::getCsiRootDir(workDir),
+      info.storage().type(),
+      info.storage().name(),
+      containerId);
+
+  if (endpoint.isError()) {
+    return Failure(
+        "Failed to resolve endpoint path for plugin container '" +
+        stringify(containerId) + "': " + endpoint.error());
+  }
+
+  const string& endpointPath = endpoint.get();
+  Environment::Variable* endpointVar =
+    commandInfo.mutable_environment()->add_variables();
+  endpointVar->set_name("CSI_ENDPOINT");
+  endpointVar->set_value("unix://" + endpointPath);
+
+  ContainerInfo containerInfo;
+
+  if (config->has_container()) {
+    containerInfo.CopyFrom(config->container());
+  } else {
+    containerInfo.set_type(ContainerInfo::MESOS);
+  }
+
+  // Prepare a volume where the endpoint socket will be placed.
+  const string endpointDir = Path(endpointPath).dirname();
+  Volume* endpointVolume = containerInfo.add_volumes();
+  endpointVolume->set_mode(Volume::RW);
+  endpointVolume->set_container_path(endpointDir);
+  endpointVolume->set_host_path(endpointDir);
+
+  // Prepare the directory where the mount points will be placed.
+  const string mountDir = csi::paths::getMountRootDir(
+      slave::paths::getCsiRootDir(workDir),
+      info.storage().type(),
+      info.storage().name());
+
+  Try<Nothing> mkdir = os::mkdir(mountDir);
+  if (mkdir.isError()) {
+    return Failure(
+        "Failed to create directory '" + mountDir +
+        "': " + mkdir.error());
+  }
+
+  // Prepare a volume where the mount points will be placed.
+  Volume* mountVolume = containerInfo.add_volumes();
+  mountVolume->set_mode(Volume::RW);
+  mountVolume->set_container_path(mountDir);
+  mountVolume->mutable_source()->set_type(Volume::Source::HOST_PATH);
+  mountVolume->mutable_source()->mutable_host_path()->set_path(mountDir);
+  mountVolume->mutable_source()->mutable_host_path()
+    ->mutable_mount_propagation()->set_mode(MountPropagation::BIDIRECTIONAL);
+
+  CHECK(!services.contains(containerId));
+  services[containerId].reset(new Promise<csi::Client>());
+
+  Try<Owned<ContainerDaemon>> daemon = ContainerDaemon::create(
+      extractParentEndpoint(url),
+      authToken,
+      containerId,
+      commandInfo,
+      config->resources(),
+      containerInfo,
+      std::function<Future<Nothing>()>(defer(self(), [=]() {
+        CHECK(services.at(containerId)->future().isPending());
+
+        return connect(endpointPath)
+          .then(defer(self(), [=](const csi::Client& client) {
+            services.at(containerId)->set(client);
+            return Nothing();
+          }))
+          .onFailed(defer(self(), [=](const string& failure) {
+            services.at(containerId)->fail(failure);
+          }))
+          .onDiscarded(defer(self(), [=] {
+            services.at(containerId)->discard();
+          }));
+      })),
+      std::function<Future<Nothing>()>(defer(self(), [=]() -> Future<Nothing> {
+        services.at(containerId)->discard();
+        services.at(containerId).reset(new Promise<csi::Client>());
+
+        if (os::exists(endpointPath)) {
+          Try<Nothing> rm = os::rm(endpointPath);
+          if (rm.isError()) {
+            return Failure(
+                "Failed to remove endpoint '" + endpointPath +
+                "': " + rm.error());
+          }
+        }
+
+        return Nothing();
+      })));
+
+  if (daemon.isError()) {
+    return Failure(
+        "Failed to create container daemon for plugin container '" +
+        stringify(containerId) + "': " + daemon.error());
+  }
+
+  const string message =
+    "Container daemon for '" + stringify(containerId) + "' failed";
+
+  daemons[containerId] = daemon.get();
+  daemon.get()->wait()
+    .onFailed(defer(self(), &Self::fatal, message, lambda::_1))
+    .onDiscarded(defer(self(), &Self::fatal, message, "future discarded"));
+
+  return services.at(containerId)->future();
+}
+
+
 Try<Owned<LocalResourceProvider>> StorageLocalResourceProvider::create(
     const http::URL& url,
     const string& workDir,

http://git-wip-us.apache.org/repos/asf/mesos/blob/434a0941/src/v1/mesos.cpp
----------------------------------------------------------------------
diff --git a/src/v1/mesos.cpp b/src/v1/mesos.cpp
index 15eb848..8369d19 100644
--- a/src/v1/mesos.cpp
+++ b/src/v1/mesos.cpp
@@ -651,6 +651,14 @@ ostream& operator<<(ostream& stream, const CheckInfo::Type& type)
 
 ostream& operator<<(
     ostream& stream,
+    const CSIPluginContainerInfo::Service& service)
+{
+  return stream << CSIPluginContainerInfo::Service_Name(service);
+}
+
+
+ostream& operator<<(
+    ostream& stream,
     const FrameworkInfo::Capability& capability)
 {
   return stream << FrameworkInfo::Capability::Type_Name(capability.type());


[3/5] mesos git commit: Initialized and subscribed storage local resource provider.

Posted by ji...@apache.org.
Initialized and subscribed storage local resource provider.

This patch validates `ResourceProviderInfo` for storage local resource
providers upon creation. During initialization, the storage local
resource provider first tries to recover its ID of the last session
through reading the actual path linked by
\`<work_dir>/meta/resource_providers/<type>/<name>/latest\`,
then subscribe to the agent's resource provider manager.

This patch is split from https://reviews.apache.org/r/63021/.

Review: https://reviews.apache.org/r/63823/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/619f4ae1
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/619f4ae1
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/619f4ae1

Branch: refs/heads/master
Commit: 619f4ae1a67d7110097f7b733741314c211e72c0
Parents: 5044610
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Fri Dec 1 15:11:17 2017 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Fri Dec 1 17:32:03 2017 -0800

----------------------------------------------------------------------
 src/resource_provider/storage/provider.cpp | 235 +++++++++++++++++++++++-
 1 file changed, 229 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/619f4ae1/src/resource_provider/storage/provider.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index f586afc..eee857b 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -21,6 +21,7 @@
 #include <glog/logging.h>
 
 #include <process/defer.hpp>
+#include <process/delay.hpp>
 #include <process/id.hpp>
 #include <process/process.hpp>
 
@@ -28,27 +29,37 @@
 
 #include <mesos/v1/resource_provider.hpp>
 
+#include <stout/path.hpp>
+
+#include <stout/os/realpath.hpp>
+
+#include "common/http.hpp"
+
 #include "internal/devolve.hpp"
+#include "internal/evolve.hpp"
 
 #include "resource_provider/detector.hpp"
 
+#include "slave/paths.hpp"
+
 namespace http = process::http;
 
 using std::queue;
 using std::string;
 
+using process::Failure;
+using process::Future;
 using process::Owned;
 using process::Process;
 
 using process::defer;
 using process::spawn;
-using process::terminate;
-using process::wait;
 
 using process::http::authentication::Principal;
 
 using mesos::ResourceProviderInfo;
 
+using mesos::resource_provider::Call;
 using mesos::resource_provider::Event;
 
 using mesos::v1::resource_provider::Driver;
@@ -73,6 +84,23 @@ static bool isValidName(const string& s)
 }
 
 
+// Returns true if the string is a valid Java package name.
+static bool isValidType(const string& s)
+{
+  if (s.empty()) {
+    return false;
+  }
+
+  foreach (const string& token, strings::split(s, ".")) {
+    if (!isValidName(token)) {
+      return false;
+    }
+  }
+
+  return true;
+}
+
+
 // Returns a prefix for naming standalone containers to run CSI plugins
 // for the resource provider. The prefix is of the following format:
 //     <rp_type>-<rp_name>--
@@ -100,8 +128,10 @@ public:
       const SlaveID& _slaveId,
       const Option<string>& _authToken)
     : ProcessBase(process::ID::generate("storage-local-resource-provider")),
+      state(RECOVERING),
       url(_url),
       workDir(_workDir),
+      metaDir(slave::paths::getMetaRootDir(_workDir)),
       contentType(ContentType::PROTOBUF),
       info(_info),
       slaveId(_slaveId),
@@ -119,42 +149,75 @@ public:
 
 private:
   void initialize() override;
+  void fatal(const string& messsage, const string& failure);
+
+  Future<Nothing> recover();
+  void doReliableRegistration();
+
+  // Functions for received events.
+  void subscribed(const Event::Subscribed& subscribed);
+  void operation(const Event::Operation& operation);
+  void publish(const Event::Publish& publish);
+
+  enum State
+  {
+    RECOVERING,
+    DISCONNECTED,
+    CONNECTED,
+    SUBSCRIBED,
+    READY
+  } state;
 
   const http::URL url;
   const string workDir;
+  const string metaDir;
   const ContentType contentType;
   ResourceProviderInfo info;
   const SlaveID slaveId;
+  const Option<string> authToken;
+
   Owned<v1::resource_provider::Driver> driver;
-  Option<string> authToken;
 };
 
 
 void StorageLocalResourceProviderProcess::connected()
 {
+  CHECK_EQ(DISCONNECTED, state);
+
+  state = CONNECTED;
+
+  doReliableRegistration();
 }
 
 
 void StorageLocalResourceProviderProcess::disconnected()
 {
+  CHECK(state == CONNECTED || state == SUBSCRIBED || state == READY);
+
+  LOG(INFO) << "Disconnected from resource provider manager";
+
+  state = DISCONNECTED;
 }
 
 
 void StorageLocalResourceProviderProcess::received(const Event& event)
 {
-  // TODO(jieyu): Print resource provider ID.
   LOG(INFO) << "Received " << event.type() << " event";
 
   switch (event.type()) {
     case Event::SUBSCRIBED: {
+      CHECK(event.has_subscribed());
+      subscribed(event.subscribed());
       break;
     }
     case Event::OPERATION: {
       CHECK(event.has_operation());
+      operation(event.operation());
       break;
     }
     case Event::PUBLISH: {
       CHECK(event.has_publish());
+      publish(event.publish());
       break;
     }
     case Event::UNKNOWN: {
@@ -167,6 +230,53 @@ void StorageLocalResourceProviderProcess::received(const Event& event)
 
 void StorageLocalResourceProviderProcess::initialize()
 {
+  const string message =
+    "Failed to recover resource provider with type '" + info.type() +
+    "' and name '" + info.name() + "'";
+
+  recover()
+    .onFailed(defer(self(), &Self::fatal, message, lambda::_1))
+    .onDiscarded(defer(self(), &Self::fatal, message, "future discarded"));
+}
+
+
+void StorageLocalResourceProviderProcess::fatal(
+    const string& message,
+    const string& failure)
+{
+  LOG(ERROR) << message << ": " << failure;
+
+  // Force the disconnection early.
+  driver.reset();
+
+  process::terminate(self());
+}
+
+
+Future<Nothing> StorageLocalResourceProviderProcess::recover()
+{
+  CHECK_EQ(RECOVERING, state);
+
+  // Recover the resource provider ID from the latest symlink. If the
+  // symlink does not exist or it points to a non-exist directory,
+  // treat this as a new resource provider.
+  // TODO(chhsiao): State recovery.
+  Result<string> realpath = os::realpath(
+      slave::paths::getLatestResourceProviderPath(
+          metaDir, slaveId, info.type(), info.name()));
+
+  if (realpath.isError()) {
+    return Failure(
+        "Failed to read the latest symlink for resource provider with type '" +
+        info.type() + "' and name '" + info.name() + "': " + realpath.error());
+  }
+
+  if (realpath.isSome()) {
+    info.mutable_id()->set_value(Path(realpath.get()).basename());
+  }
+
+  state = DISCONNECTED;
+
   driver.reset(new Driver(
       Owned<EndpointDetector>(new ConstantEndpointDetector(url)),
       contentType,
@@ -180,6 +290,78 @@ void StorageLocalResourceProviderProcess::initialize()
         }
       }),
       None())); // TODO(nfnt): Add authentication as part of MESOS-7854.
+
+  return Nothing();
+}
+
+
+void StorageLocalResourceProviderProcess::doReliableRegistration()
+{
+  if (state == DISCONNECTED || state == SUBSCRIBED || state == READY) {
+    return;
+  }
+
+  CHECK_EQ(CONNECTED, state);
+
+  const string message =
+    "Failed to subscribe resource provider with type '" + info.type() +
+    "' and name '" + info.name() + "'";
+
+  Call call;
+  call.set_type(Call::SUBSCRIBE);
+
+  Call::Subscribe* subscribe = call.mutable_subscribe();
+  subscribe->mutable_resource_provider_info()->CopyFrom(info);
+
+  driver->send(evolve(call));
+
+  // TODO(chhsiao): Consider doing an exponential backoff.
+  delay(Seconds(1), self(), &Self::doReliableRegistration);
+}
+
+
+void StorageLocalResourceProviderProcess::subscribed(
+    const Event::Subscribed& subscribed)
+{
+  CHECK_EQ(CONNECTED, state);
+
+  LOG(INFO) << "Subscribed with ID " << subscribed.provider_id().value();
+
+  state = SUBSCRIBED;
+
+  if (!info.has_id()) {
+    // New subscription.
+    info.mutable_id()->CopyFrom(subscribed.provider_id());
+    slave::paths::createResourceProviderDirectory(
+        metaDir,
+        slaveId,
+        info.type(),
+        info.name(),
+        info.id());
+  }
+}
+
+
+void StorageLocalResourceProviderProcess::operation(
+    const Event::Operation& operation)
+{
+  if (state == SUBSCRIBED) {
+    // TODO(chhsiao): Reject this operation.
+    return;
+  }
+
+  CHECK_EQ(READY, state);
+}
+
+
+void StorageLocalResourceProviderProcess::publish(const Event::Publish& publish)
+{
+  if (state == SUBSCRIBED) {
+    // TODO(chhsiao): Reject this publish request.
+    return;
+  }
+
+  CHECK_EQ(READY, state);
 }
 
 
@@ -199,6 +381,47 @@ Try<Owned<LocalResourceProvider>> StorageLocalResourceProvider::create(
         "' does not follow Java package naming convention");
   }
 
+  if (!info.has_storage()) {
+    return Error("'ResourceProviderInfo.storage' must be set");
+  }
+
+  // Verify that the type and name of the CSI plugin follow Java package
+  // naming convention.
+  // TODO(chhsiao): We should move this check to a validation function
+  // for `CSIPluginInfo`.
+  if (!isValidType(info.storage().type()) ||
+      !isValidName(info.storage().name())) {
+    return Error(
+        "CSI plugin type '" + info.storage().type() +
+        "' and name '" + info.storage().name() +
+        "' does not follow Java package naming convention");
+  }
+
+  bool hasControllerService = false;
+  bool hasNodeService = false;
+
+  foreach (const CSIPluginContainerInfo& container,
+           info.storage().containers()) {
+    for (int i = 0; i < container.services_size(); i++) {
+      const CSIPluginContainerInfo::Service service = container.services(i);
+      if (service == CSIPluginContainerInfo::CONTROLLER_SERVICE) {
+        hasControllerService = true;
+      } else if (service == CSIPluginContainerInfo::NODE_SERVICE) {
+        hasNodeService = true;
+      }
+    }
+  }
+
+  if (!hasControllerService) {
+    return Error(
+        stringify(CSIPluginContainerInfo::CONTROLLER_SERVICE) + " not found");
+  }
+
+  if (!hasNodeService) {
+    return Error(
+        stringify(CSIPluginContainerInfo::NODE_SERVICE) + " not found");
+  }
+
   return Owned<LocalResourceProvider>(
       new StorageLocalResourceProvider(url, workDir, info, slaveId, authToken));
 }
@@ -228,8 +451,8 @@ StorageLocalResourceProvider::StorageLocalResourceProvider(
 
 StorageLocalResourceProvider::~StorageLocalResourceProvider()
 {
-  terminate(process.get());
-  wait(process.get());
+  process::terminate(process.get());
+  process::wait(process.get());
 }
 
 } // namespace internal {


[4/5] mesos git commit: Added the `ResourceProviderState` protobuf for resource providers.

Posted by ji...@apache.org.
Added the `ResourceProviderState` protobuf for resource providers.

The `ResourceProviderState` protobuf includes the list of pending offer
operations, the total resources, and the current resource version UUID.
Note that the pending operations do not includes completed operations
that have not been acknowledged yet.

Review: https://reviews.apache.org/r/64075/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/089e789b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/089e789b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/089e789b

Branch: refs/heads/master
Commit: 089e789b7820de6b3fde3a94c98fb1d89194a618
Parents: 3adab34
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Fri Dec 1 15:12:11 2017 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Fri Dec 1 17:32:03 2017 -0800

----------------------------------------------------------------------
 src/CMakeLists.txt                         |  1 +
 src/Makefile.am                            |  6 ++-
 src/resource_provider/registry.hpp         |  2 +-
 src/resource_provider/state.hpp            | 24 ++++++++++
 src/resource_provider/state.proto          | 51 +++++++++++++++++++++
 src/resource_provider/storage/provider.cpp | 60 ++++++++++++++++++++++++-
 6 files changed, 141 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/089e789b/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 15cda10..318cb40 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -71,6 +71,7 @@ PROTOC_GENERATE(INTERNAL TARGET slave/containerizer/mesos/isolators/docker/volum
 PROTOC_GENERATE(INTERNAL TARGET slave/containerizer/mesos/provisioner/docker/message)
 PROTOC_GENERATE(INTERNAL TARGET master/registry)
 PROTOC_GENERATE(INTERNAL TARGET resource_provider/registry)
+PROTOC_GENERATE(INTERNAL TARGET resource_provider/state)
 
 
 # BUILD PROTOBUFS.

http://git-wip-us.apache.org/repos/asf/mesos/blob/089e789b/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 3444388..08d29ab 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -366,6 +366,8 @@ CXX_PROTOS +=								\
   messages/messages.pb.h						\
   resource_provider/registry.pb.cc					\
   resource_provider/registry.pb.h					\
+  resource_provider/state.pb.cc						\
+  resource_provider/state.pb.h						\
   slave/containerizer/mesos/provisioner/docker/message.pb.cc		\
   slave/containerizer/mesos/provisioner/docker/message.pb.h		\
   slave/containerizer/mesos/isolators/docker/volume/state.pb.cc		\
@@ -930,6 +932,8 @@ libmesos_no_3rdparty_la_SOURCES =					\
   master/registry.proto							\
   messages/flags.proto							\
   messages/messages.proto						\
+  resource_provider/registry.proto					\
+  resource_provider/state.proto						\
   slave/containerizer/mesos/provisioner/docker/message.proto		\
   slave/containerizer/mesos/isolators/docker/volume/state.proto		\
   slave/containerizer/mesos/isolators/network/cni/spec.proto
@@ -1153,7 +1157,7 @@ libmesos_no_3rdparty_la_SOURCES +=					\
   resource_provider/message.hpp						\
   resource_provider/registrar.hpp					\
   resource_provider/registry.hpp					\
-  resource_provider/registry.proto					\
+  resource_provider/state.hpp						\
   resource_provider/validation.hpp					\
   sched/constants.hpp							\
   sched/flags.hpp							\

http://git-wip-us.apache.org/repos/asf/mesos/blob/089e789b/src/resource_provider/registry.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/registry.hpp b/src/resource_provider/registry.hpp
index 048cd6b..4c6c4d4 100644
--- a/src/resource_provider/registry.hpp
+++ b/src/resource_provider/registry.hpp
@@ -19,6 +19,6 @@
 #define __RESOURCE_PROVIDER_REGISTRY_HPP__
 
 // ONLY USEFUL AFTER RUNNING PROTOC.
-#include <resource_provider/registry.pb.h>
+#include "resource_provider/registry.pb.h"
 
 #endif // __RESOURCE_PROVIDER_REGISTRY_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/089e789b/src/resource_provider/state.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/state.hpp b/src/resource_provider/state.hpp
new file mode 100644
index 0000000..ab1318b
--- /dev/null
+++ b/src/resource_provider/state.hpp
@@ -0,0 +1,24 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+
+#ifndef __RESOURCE_PROVIDER_STATE_HPP__
+#define __RESOURCE_PROVIDER_STATE_HPP__
+
+// ONLY USEFUL AFTER RUNNING PROTOC.
+#include "resource_provider/state.pb.h"
+
+#endif // __RESOURCE_PROVIDER_STATE_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/089e789b/src/resource_provider/state.proto
----------------------------------------------------------------------
diff --git a/src/resource_provider/state.proto b/src/resource_provider/state.proto
new file mode 100644
index 0000000..ae30d16
--- /dev/null
+++ b/src/resource_provider/state.proto
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto2";
+
+import "mesos/mesos.proto";
+
+import "mesos/resource_provider/resource_provider.proto";
+
+package mesos.resource_provider.state;
+
+option java_package = "org.apache.mesos.resource_provider.state";
+option java_outer_classname = "Protos";
+
+
+message ResourceProviderState {
+  // This includes only pending operations. Operations that have
+  // unacknowledged statuses should be recovered through the status
+  // update manager.
+  repeated Event.Operation operations = 1;
+
+  // The total resources provided by this resource provider.
+  repeated Resource resources = 2;
+
+  // Used to establish the relationship between the operation and
+  // the resources that the operation is operating on. Each resource
+  // provider will keep a resource version UUID, and change it when
+  // it believes that the resources from this resource provider are
+  // out of sync from the master's view. The master will keep track
+  // of the last known resource version UUID for each resource
+  // provider, and attach the resource version UUID in each
+  // operation it sends out. The resource provider should reject
+  // operations that have a different resource version UUID than
+  // that it maintains, because this means the operation is
+  // operating on resources that might have already been
+  // invalidated.
+  required bytes resource_version_uuid = 3;
+}

http://git-wip-us.apache.org/repos/asf/mesos/blob/089e789b/src/resource_provider/storage/provider.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index a57ef15..d35b0d0 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -30,6 +30,7 @@
 #include <process/process.hpp>
 #include <process/timeout.hpp>
 
+#include <mesos/resources.hpp>
 #include <mesos/type_utils.hpp>
 
 #include <mesos/resource_provider/resource_provider.hpp>
@@ -56,6 +57,7 @@
 #include "internal/evolve.hpp"
 
 #include "resource_provider/detector.hpp"
+#include "resource_provider/state.hpp"
 
 #include "slave/container_daemon.hpp"
 #include "slave/paths.hpp"
@@ -91,6 +93,8 @@ using mesos::internal::slave::ContainerDaemon;
 using mesos::resource_provider::Call;
 using mesos::resource_provider::Event;
 
+using mesos::resource_provider::state::ResourceProviderState;
+
 using mesos::v1::resource_provider::Driver;
 
 namespace mesos {
@@ -272,6 +276,8 @@ private:
   Future<Nothing> prepareControllerService();
   Future<Nothing> prepareNodeService();
 
+  void checkpointResourceProviderState();
+
   enum State
   {
     RECOVERING,
@@ -302,6 +308,10 @@ private:
   Option<csi::GetPluginInfoResponse> nodeInfo;
   Option<csi::ControllerCapabilities> controllerCapabilities;
   Option<string> nodeId;
+
+  list<Event::Operation> pendingOperations;
+  Resources totalResources;
+  string resourceVersion;
 };
 
 
@@ -428,8 +438,34 @@ Future<Nothing> StorageLocalResourceProviderProcess::recover()
             ": " + realpath.error());
       }
 
-      if (realpath.isSome()) {
+      if (realpath.isNone()) {
+        resourceVersion = UUID::random().toBytes();
+      } else {
         info.mutable_id()->set_value(Path(realpath.get()).basename());
+
+        const string statePath = slave::paths::getResourceProviderStatePath(
+            metaDir, slaveId, info.type(), info.name(), info.id());
+
+        Result<ResourceProviderState> resourceProviderState =
+          ::protobuf::read<ResourceProviderState>(statePath);
+
+        if (resourceProviderState.isError()) {
+          return Failure(
+              "Failed to read resource provider state from '" + statePath +
+              "': " + resourceProviderState.error());
+        }
+
+        if (resourceProviderState.isNone()) {
+          resourceVersion = UUID::random().toBytes();
+        } else {
+          foreach (const Event::Operation& operation,
+                   resourceProviderState->operations()) {
+            pendingOperations.push_back(operation);
+          }
+
+          totalResources = resourceProviderState->resources();
+          resourceVersion = resourceProviderState->resource_version_uuid();
+        }
       }
 
       state = DISCONNECTED;
@@ -589,6 +625,9 @@ void StorageLocalResourceProviderProcess::operation(
   }
 
   CHECK_EQ(READY, state);
+
+  pendingOperations.push_back(operation);
+  checkpointResourceProviderState();
 }
 
 
@@ -970,6 +1009,25 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareNodeService()
 }
 
 
+void StorageLocalResourceProviderProcess::checkpointResourceProviderState()
+{
+  ResourceProviderState state;
+
+  foreach (const Event::Operation& operation, pendingOperations) {
+    state.add_operations()->CopyFrom(operation);
+  }
+
+  state.mutable_resources()->CopyFrom(totalResources);
+  state.set_resource_version_uuid(resourceVersion);
+
+  const string statePath = slave::paths::getResourceProviderStatePath(
+      metaDir, slaveId, info.type(), info.name(), info.id());
+
+  CHECK_SOME(slave::state::checkpoint(statePath, state))
+    << "Failed to checkpoint resource provider state to '" << statePath << "'";
+}
+
+
 Try<Owned<LocalResourceProvider>> StorageLocalResourceProvider::create(
     const http::URL& url,
     const string& workDir,