You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by gr...@apache.org on 2017/12/14 01:11:07 UTC

[2/3] mesos git commit: Initialized offer operation status update manager in SLRP.

Initialized offer operation status update manager in SLRP.

This patch adds an agent filesystem layout for checkpointing offer
operation status updates for resource providers, and initialized
a status update manager in the storage local resource provider.

Review: https://reviews.apache.org/r/64475/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3100e9aa
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3100e9aa
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3100e9aa

Branch: refs/heads/master
Commit: 3100e9aa0ac9b6bcc92643b145e2730fc862ea39
Parents: c728f8e
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Wed Dec 13 16:02:33 2017 -0800
Committer: Greg Mann <gr...@gmail.com>
Committed: Wed Dec 13 17:05:57 2017 -0800

----------------------------------------------------------------------
 src/resource_provider/storage/provider.cpp | 130 +++++++++++++++++++-----
 src/slave/paths.cpp                        |  54 ++++++++++
 src/slave/paths.hpp                        |  23 +++++
 3 files changed, 179 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/3100e9aa/src/resource_provider/storage/provider.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index e806f44..2fd4a3b 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -66,6 +66,8 @@
 #include "slave/paths.hpp"
 #include "slave/state.hpp"
 
+#include "status_update_manager/offer_operation.hpp"
+
 namespace http = process::http;
 
 using std::accumulate;
@@ -327,6 +329,7 @@ private:
   Future<Nothing> recover();
   Future<Nothing> recoverServices();
   Future<Nothing> recoverVolumes();
+  Future<Nothing> recoverStatusUpdates();
   void doReliableRegistration();
   Future<Nothing> reconcile();
 
@@ -359,8 +362,8 @@ private:
   Future<Nothing> deleteVolume(const string& volumeId);
 
   // Applies the offer operation. Conventional operations will be
-  // synchoronusly applied.
-  Future<Nothing> applyOfferOperation(const UUID& operationUuid);
+  // synchronously applied.
+  Future<Nothing> _applyOfferOperation(const UUID& operationUuid);
 
   Future<vector<ResourceConversion>> applyCreateVolumeOrBlock(
       const Resource& resource,
@@ -375,9 +378,15 @@ private:
       const Try<vector<ResourceConversion>>& conversions);
 
   void checkpointResourceProviderState();
-  void sendResourceProviderStateUpdate();
   void checkpointVolumeState(const string& volumeId);
 
+  void sendResourceProviderStateUpdate();
+
+  // NOTE: This is a callback for the status update manager and should
+  // not be called directly.
+  void sendOfferOperationStatusUpdate(
+      const OfferOperationStatusUpdate& statusUpdate);
+
   enum State
   {
     RECOVERING,
@@ -400,6 +409,7 @@ private:
   hashmap<string, ProfileData> profiles;
   process::grpc::client::Runtime runtime;
   Owned<v1::resource_provider::Driver> driver;
+  OfferOperationStatusUpdateManager statusUpdateManager;
 
   ContainerID controllerContainerId;
   ContainerID nodeContainerId;
@@ -611,15 +621,15 @@ Future<Nothing> StorageLocalResourceProviderProcess::recover()
       // We replay all pending operations here, so that if a volume is
       // actually created before the last failover, it will be reflected
       // in the updated total resources before we do the reconciliation.
-      // NOTE: `applyOfferOperation` will remove the applied operation
+      // NOTE: `_applyOfferOperation` will remove the applied operation
       // from the list of pending operations, so we make a copy of keys
       // here.
       foreach (const UUID& uuid, pendingOperations.keys()) {
-        applyOfferOperation(uuid)
+        _applyOfferOperation(uuid)
           .onAny(defer(self(), [=](const Future<Nothing>& future) {
             if (!future.isReady()) {
               LOG(ERROR)
-                << "Failed to apply operation " << uuid << ": "
+                << "Failed to apply offer operation with UUID " << uuid << ": "
                 << (future.isFailed() ? future.failure() : "future discarded");
             }
           }));
@@ -841,6 +851,27 @@ Future<Nothing> StorageLocalResourceProviderProcess::recoverVolumes()
 }
 
 
+Future<Nothing> StorageLocalResourceProviderProcess::recoverStatusUpdates()
+{
+  CHECK(info.has_id());
+
+  const string resourceProviderDir = slave::paths::getResourceProviderPath(
+      metaDir, slaveId, info.type(), info.name(), info.id());
+
+  statusUpdateManager.initialize(
+      defer(self(), &Self::sendOfferOperationStatusUpdate, lambda::_1),
+      std::bind(
+          &slave::paths::getOfferOperationUpdatesPath,
+          resourceProviderDir,
+          lambda::_1));
+
+  statusUpdateManager.pause();
+
+  // TODO(chhsiao): Recover status updates.
+  return Nothing();
+}
+
+
 void StorageLocalResourceProviderProcess::doReliableRegistration()
 {
   if (state == DISCONNECTED || state == SUBSCRIBED || state == READY) {
@@ -872,7 +903,8 @@ void StorageLocalResourceProviderProcess::doReliableRegistration()
 
 Future<Nothing> StorageLocalResourceProviderProcess::reconcile()
 {
-  return importResources()
+  return recoverStatusUpdates()
+    .then(defer(self(), &Self::importResources))
     .then(defer(self(), [=](Resources importedResources) {
       // NODE: If a resource in the checkpointed total resources is
       // missing in the imported resources, we will still keep it if it
@@ -945,6 +977,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::reconcile()
       }
 
       sendResourceProviderStateUpdate();
+      statusUpdateManager.resume();
 
       state = READY;
 
@@ -1005,7 +1038,7 @@ void StorageLocalResourceProviderProcess::applyOfferOperation(
   pendingOperations[uuid.get()] = operation;
   checkpointResourceProviderState();
 
-  applyOfferOperation(uuid.get())
+  _applyOfferOperation(uuid.get())
     .onAny(defer(self(), [=](const Future<Nothing>& future) {
       if (!future.isReady()) {
         LOG(ERROR)
@@ -1942,8 +1975,8 @@ Future<string> StorageLocalResourceProviderProcess::createVolume(
 
           if (volumes.contains(volumeInfo.id())) {
             // The resource provider failed over after the last
-            // `CreateVolume` call, but before the operation status
-            // was checkpointed.
+            // `CreateVolume` call, but before the offer operation
+            // status was checkpointed.
             CHECK_EQ(csi::state::VolumeState::CREATED,
                      volumes.at(volumeInfo.id()).state.state());
           } else {
@@ -2027,7 +2060,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume(
     }
   } else {
     // The resource provider failed over after the last `DeleteVolume`
-    // call, but before the operation status was checkpointed.
+    // call, but before the offer operation status was checkpointed.
     CHECK(!os::exists(volumePath));
   }
 
@@ -2038,7 +2071,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume(
 }
 
 
-Future<Nothing> StorageLocalResourceProviderProcess::applyOfferOperation(
+Future<Nothing> StorageLocalResourceProviderProcess::_applyOfferOperation(
     const UUID& operationUuid)
 {
   Future<vector<ResourceConversion>> conversions;
@@ -2205,7 +2238,7 @@ StorageLocalResourceProviderProcess::applyCreateVolumeOrBlock(
     created = resource.disk().source().id();
   } else {
     // We use the operation UUID as the name of the volume, so the same
-    // operation will create the same volume after recovery.
+    // offer operation will create the same volume after recovery.
     // TODO(chhsiao): Call `CreateVolume` sequentially with other create
     // or delete operations.
     // TODO(chhsiao): Send `UPDATE_STATE` for RAW resources.
@@ -2389,8 +2422,8 @@ Try<Nothing> StorageLocalResourceProviderProcess::applyResourceConversions(
 
   auto err = [](const UUID& operationUuid, const string& message) {
     LOG(ERROR)
-      << "Failed to send status update for offer operation " << operationUuid
-      << ": " << message;
+      << "Failed to send status update for offer operation with UUID "
+      << operationUuid << ": " << message;
   };
 
   driver->send(evolve(call))
@@ -2426,8 +2459,28 @@ void StorageLocalResourceProviderProcess::checkpointResourceProviderState()
   const string statePath = slave::paths::getResourceProviderStatePath(
       metaDir, slaveId, info.type(), info.name(), info.id());
 
-  CHECK_SOME(slave::state::checkpoint(statePath, state))
-    << "Failed to checkpoint resource provider state to '" << statePath << "'";
+  Try<Nothing> checkpoint = slave::state::checkpoint(statePath, state);
+  CHECK_SOME(checkpoint)
+    << "Failed to checkpoint resource provider state to '" << statePath << "': "
+    << checkpoint.error();
+}
+
+
+void StorageLocalResourceProviderProcess::checkpointVolumeState(
+    const string& volumeId)
+{
+  const string statePath = csi::paths::getVolumeStatePath(
+      slave::paths::getCsiRootDir(workDir),
+      info.storage().plugin().type(),
+      info.storage().plugin().name(),
+      volumeId);
+
+  Try<Nothing> checkpoint =
+    slave::state::checkpoint(statePath, volumes.at(volumeId).state);
+
+  CHECK_SOME(checkpoint)
+    << "Failed to checkpoint volume state to '" << statePath << "':"
+    << checkpoint.error();
 }
 
 
@@ -2443,8 +2496,8 @@ void StorageLocalResourceProviderProcess::sendResourceProviderStateUpdate()
                const Event::ApplyOfferOperation& operation,
                pendingOperations) {
     // TODO(chhsiao): Maintain a list of terminated but unacknowledged
-    // offer operations in memory and reconstruc that during recovery
-    // by querying status update manager.
+    // offer operations in memory and reconstruct it during recovery
+    // by querying the status update manager.
     update->add_operations()->CopyFrom(
         protobuf::createOfferOperation(
             operation.info(),
@@ -2475,17 +2528,38 @@ void StorageLocalResourceProviderProcess::sendResourceProviderStateUpdate()
 }
 
 
-void StorageLocalResourceProviderProcess::checkpointVolumeState(
-    const string& volumeId)
+void StorageLocalResourceProviderProcess::sendOfferOperationStatusUpdate(
+      const OfferOperationStatusUpdate& statusUpdate)
 {
-  const string statePath = csi::paths::getVolumeStatePath(
-      slave::paths::getCsiRootDir(workDir),
-      info.storage().plugin().type(),
-      info.storage().plugin().name(),
-      volumeId);
+  Call call;
+  call.set_type(Call::UPDATE_OFFER_OPERATION_STATUS);
+  call.mutable_resource_provider_id()->CopyFrom(info.id());
 
-  CHECK_SOME(slave::state::checkpoint(statePath, volumes.at(volumeId).state))
-    << "Failed to checkpoint volume state to '" << statePath << "'";
+  Call::UpdateOfferOperationStatus* update =
+    call.mutable_update_offer_operation_status();
+  update->set_operation_uuid(statusUpdate.operation_uuid());
+  update->mutable_status()->CopyFrom(statusUpdate.status());
+
+  if (statusUpdate.has_framework_id()) {
+    update->mutable_framework_id()->CopyFrom(statusUpdate.framework_id());
+  }
+
+  // The latest status should have been set by the status update manager.
+  CHECK(statusUpdate.has_latest_status());
+  update->mutable_latest_status()->CopyFrom(statusUpdate.latest_status());
+
+  auto err = [](const UUID& uuid, const string& message) {
+    LOG(ERROR)
+      << "Failed to send status update for offer operation with UUID " << uuid
+      << ": " << message;
+  };
+
+  Try<UUID> uuid = UUID::fromBytes(statusUpdate.operation_uuid());
+  CHECK_SOME(uuid);
+
+  driver->send(evolve(call))
+    .onFailed(std::bind(err, uuid.get(), lambda::_1))
+    .onDiscarded(std::bind(err, uuid.get(), "future discarded"));
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/3100e9aa/src/slave/paths.cpp
----------------------------------------------------------------------
diff --git a/src/slave/paths.cpp b/src/slave/paths.cpp
index b8004e7..f9f0c78 100644
--- a/src/slave/paths.cpp
+++ b/src/slave/paths.cpp
@@ -65,6 +65,7 @@ const char TASK_UPDATES_FILE[] = "task.updates";
 const char RESOURCES_INFO_FILE[] = "resources.info";
 const char RESOURCES_TARGET_FILE[] = "resources.target";
 const char RESOURCE_PROVIDER_STATE_FILE[] = "resource_provider.state";
+const char OFFER_OPERATION_UPDATES_FILE[] = "operation.updates";
 
 
 const char CONTAINERS_DIR[] = "containers";
@@ -75,6 +76,7 @@ const char EXECUTORS_DIR[] = "executors";
 const char EXECUTOR_RUNS_DIR[] = "runs";
 const char RESOURCE_PROVIDER_REGISTRY[] = "resource_provider_registry";
 const char RESOURCE_PROVIDERS_DIR[] = "resource_providers";
+const char OFFER_OPERATIONS_DIR[] = "operations";
 
 
 Try<ExecutorRunPath> parseExecutorRunPath(
@@ -545,6 +547,58 @@ string getLatestResourceProviderPath(
 }
 
 
+Try<list<string>> getOfferOperationPaths(
+    const string& rootDir)
+{
+  return fs::list(path::join(rootDir, OFFER_OPERATIONS_DIR, "*"));
+}
+
+
+string getOfferOperationPath(
+    const string& rootDir,
+    const UUID& operationUuid)
+{
+  return path::join(rootDir, OFFER_OPERATIONS_DIR, operationUuid.toString());
+}
+
+
+Try<UUID> parseOfferOperationPath(
+    const string& rootDir,
+    const string& dir)
+{
+  // TODO(chhsiao): Consider using `<regex>`, which requires GCC 4.9+.
+
+  // Make sure there's a separator at the end of the prefix so that we
+  // don't accidently slice off part of a directory.
+  const string prefix = path::join(rootDir, OFFER_OPERATIONS_DIR, "");
+
+  if (!strings::startsWith(dir, prefix)) {
+    return Error(
+        "Directory '" + dir + "' does not fall under operations directory '" +
+        prefix + "'");
+  }
+
+  Try<UUID> operationUuid = UUID::fromString(Path(dir).basename());
+  if (operationUuid.isError()) {
+    return Error(
+        "Could not decode offer operation UUID from string '" +
+        Path(dir).basename() + "': " + operationUuid.error());
+  }
+
+  return operationUuid.get();
+}
+
+
+string getOfferOperationUpdatesPath(
+    const string& rootDir,
+    const UUID& operationUuid)
+{
+  return path::join(
+      getOfferOperationPath(rootDir, operationUuid),
+      OFFER_OPERATION_UPDATES_FILE);
+}
+
+
 string getResourcesInfoPath(
     const string& rootDir)
 {

http://git-wip-us.apache.org/repos/asf/mesos/blob/3100e9aa/src/slave/paths.hpp
----------------------------------------------------------------------
diff --git a/src/slave/paths.hpp b/src/slave/paths.hpp
index d645d87..bae68d0 100644
--- a/src/slave/paths.hpp
+++ b/src/slave/paths.hpp
@@ -23,6 +23,7 @@
 #include <mesos/mesos.hpp>
 
 #include <stout/try.hpp>
+#include <stout/uuid.hpp>
 
 namespace mesos {
 namespace internal {
@@ -79,6 +80,9 @@ namespace paths {
 //   |           |           |-- latest (symlink)
 //   |           |           |-- <resource_provider_id>
 //   |           |               |-- resource_provider.state
+//   |           |               |-- operations
+//   |           |                   |-- <operation_uuid>
+//   |           |                       |-- operation.updates
 //   |           |-- frameworks
 //   |               |-- <framework_id>
 //   |                   |-- framework.info
@@ -349,6 +353,25 @@ std::string getLatestResourceProviderPath(
     const std::string& resourceProviderName);
 
 
+Try<std::list<std::string>> getOfferOperationPaths(
+    const std::string& rootDir);
+
+
+std::string getOfferOperationPath(
+    const std::string& rootDir,
+    const UUID& operationUuid);
+
+
+Try<UUID> parseOfferOperationPath(
+    const std::string& rootDir,
+    const std::string& dir);
+
+
+std::string getOfferOperationUpdatesPath(
+    const std::string& rootDir,
+    const UUID& operationUuid);
+
+
 std::string getResourcesInfoPath(
     const std::string& rootDir);