You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2017/12/19 23:14:40 UTC

[10/12] mesos git commit: Dropping instead of failing offer operations with mismatched versions.

Dropping instead of failing offer operations with mismatched versions.

Review: https://reviews.apache.org/r/64692/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/db4a6a55
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/db4a6a55
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/db4a6a55

Branch: refs/heads/master
Commit: db4a6a55b1a05c42e8a89f03035db709192c1c07
Parents: a36e2ec
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Tue Dec 19 11:25:26 2017 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Tue Dec 19 15:14:22 2017 -0800

----------------------------------------------------------------------
 src/resource_provider/storage/provider.cpp | 137 +++++++++++++++---------
 1 file changed, 86 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/db4a6a55/src/resource_provider/storage/provider.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index ab8c711..772952e 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -382,6 +382,11 @@ private:
   Future<Resources> getCapacities();
 
   Future<Nothing> _applyOfferOperation(const id::UUID& operationUuid);
+  void dropOfferOperation(
+      const id::UUID& operationUuid,
+      const Option<FrameworkID>& frameworkId,
+      const Option<OfferOperationID>& operationId,
+      const string& message);
 
   Future<vector<ResourceConversion>> applyCreateVolumeOrBlock(
       const Resource& resource,
@@ -1359,47 +1364,58 @@ void StorageLocalResourceProviderProcess::applyOfferOperation(
     << "Received " << operation.info().type() << " operation '"
     << operation.info().id() << "' (uuid: " << uuid.get() << ")";
 
-  CHECK(!offerOperations.contains(uuid.get()));
-  offerOperations[uuid.get()] = protobuf::createOfferOperation(
-      operation.info(),
-      protobuf::createOfferOperationStatus(
-          OFFER_OPERATION_PENDING,
-          operation.info().has_id()
-            ? operation.info().id() : Option<OfferOperationID>::none()),
-      operation.has_framework_id()
-        ? operation.framework_id() : Option<FrameworkID>::none(),
-      slaveId,
-      uuid.get());
+  Option<FrameworkID> frameworkId = operation.has_framework_id()
+    ? operation.framework_id() : Option<FrameworkID>::none();
+  Option<OfferOperationID> operationId = operation.info().has_id()
+    ? operation.info().id() : Option<OfferOperationID>::none();
 
-  checkpointResourceProviderState();
+  if (state == SUBSCRIBED) {
+    return dropOfferOperation(
+        uuid.get(),
+        frameworkId,
+        operationId,
+        "Cannot apply offer operation in SUBSCRIBED state");
+  }
 
-  Future<Nothing> result;
+  if (reconciling) {
+    return dropOfferOperation(
+        uuid.get(),
+        frameworkId,
+        operationId,
+        "Cannot apply offer operation when reconciling storage pools");
+  }
 
   Try<id::UUID> operationVersion =
     id::UUID::fromBytes(operation.resource_version_uuid().value());
-
   CHECK_SOME(operationVersion);
 
-  if (state == SUBSCRIBED) {
-    result = updateOfferOperationStatus(uuid.get(), Error(
-        "Cannot apply offer operation in SUBSCRIBED state"));
-  } else if (reconciling) {
-    result = updateOfferOperationStatus(uuid.get(), Error(
-        "Cannot apply offer operation when reconciling storage pools"));
-  } else if (operationVersion.get() != resourceVersion) {
-    result = updateOfferOperationStatus(uuid.get(), Error(
+  if (operationVersion.get() != resourceVersion) {
+    return dropOfferOperation(
+        uuid.get(),
+        frameworkId,
+        operationId,
         "Mismatched resource version " + stringify(operationVersion.get()) +
-        " (expected: " + stringify(resourceVersion) + ")"));
-  } else {
-    result = _applyOfferOperation(uuid.get());
+        " (expected: " + stringify(resourceVersion) + ")");
   }
 
+  CHECK(!offerOperations.contains(uuid.get()));
+  offerOperations[uuid.get()] = protobuf::createOfferOperation(
+      operation.info(),
+      protobuf::createOfferOperationStatus(
+          OFFER_OPERATION_PENDING,
+          operationId),
+      frameworkId,
+      slaveId,
+      uuid.get());
+
+  checkpointResourceProviderState();
+
   auto err = [](const id::UUID& uuid, const string& message) {
     LOG(ERROR)
       << "Failed to apply offer operation (uuid: " << uuid << "): " << message;
   };
 
-  result
+  _applyOfferOperation(uuid.get())
     .onFailed(std::bind(err, uuid.get(), lambda::_1))
     .onDiscarded(std::bind(err, uuid.get(), "future discarded"));
 }
@@ -1612,29 +1628,11 @@ void StorageLocalResourceProviderProcess::reconcileOfferOperations(
       continue;
     }
 
-    OfferOperationStatusUpdate update =
-      protobuf::createOfferOperationStatusUpdate(
-          uuid.get(),
-          protobuf::createOfferOperationStatus(
-              OFFER_OPERATION_DROPPED,
-              None(),
-              None(),
-              None(),
-              id::UUID::random()),
-          None(),
-          None(),
-          slaveId);
-
-    auto die = [=](const string& message) {
-      LOG(ERROR)
-        << "Failed to update status of offer operation (uuid: " << uuid.get()
-        << "): " << message;
-      fatal();
-    };
-
-    statusUpdateManager.update(std::move(update), false)
-      .onFailed(defer(self(), std::bind(die, lambda::_1)))
-      .onDiscarded(defer(self(), std::bind(die, "future discarded")));
+    dropOfferOperation(
+        uuid.get(),
+        None(),
+        None(),
+        "Unknown offer operation");
   }
 }
 
@@ -2551,7 +2549,7 @@ Future<Resources> StorageLocalResourceProviderProcess::getCapacities()
 }
 
 
-// Applies the offer operation. Conventional operations will be
+// Applies the offer operation. Speculative operations will be
 // synchronously applied. Do nothing if the operation is already in a
 // terminal state.
 Future<Nothing> StorageLocalResourceProviderProcess::_applyOfferOperation(
@@ -2570,7 +2568,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::_applyOfferOperation(
     case Offer::Operation::UNRESERVE:
     case Offer::Operation::CREATE:
     case Offer::Operation::DESTROY: {
-      // Synchronously apply the conventional operations to ensure that
+      // Synchronously apply the speculative operations to ensure that
       // its result is reflected in the total resources before any of
       // its succeeding operations is applied.
       return updateOfferOperationStatus(
@@ -2660,6 +2658,43 @@ Future<Nothing> StorageLocalResourceProviderProcess::_applyOfferOperation(
 }
 
 
+// Sends `OFFER_OPERATION_DROPPED` without checkpointing the status of
+// the offer operation.
+void StorageLocalResourceProviderProcess::dropOfferOperation(
+    const id::UUID& operationUuid,
+    const Option<FrameworkID>& frameworkId,
+    const Option<OfferOperationID>& operationId,
+    const string& message)
+{
+  LOG(WARNING)
+    << "Dropping offer operation (uuid: " << operationUuid << "): " << message;
+
+  OfferOperationStatusUpdate update =
+    protobuf::createOfferOperationStatusUpdate(
+       operationUuid,
+       protobuf::createOfferOperationStatus(
+           OFFER_OPERATION_DROPPED,
+           operationId,
+           message,
+           None(),
+           id::UUID::random()),
+       None(),
+       frameworkId,
+       slaveId);
+
+  auto die = [=](const string& message) {
+    LOG(ERROR)
+      << "Failed to update status of offer operation (uuid: " << operationUuid
+      << "): " << message;
+    fatal();
+  };
+
+  statusUpdateManager.update(std::move(update), false)
+    .onFailed(defer(self(), std::bind(die, lambda::_1)))
+    .onDiscarded(defer(self(), std::bind(die, "future discarded")));
+}
+
+
 Future<vector<ResourceConversion>>
 StorageLocalResourceProviderProcess::applyCreateVolumeOrBlock(
     const Resource& resource,