You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2017/12/19 23:14:40 UTC
[10/12] mesos git commit: Dropping instead of failing offer
operations with mismatched versions.
Dropping instead of failing offer operations with mismatched versions.
Review: https://reviews.apache.org/r/64692/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/db4a6a55
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/db4a6a55
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/db4a6a55
Branch: refs/heads/master
Commit: db4a6a55b1a05c42e8a89f03035db709192c1c07
Parents: a36e2ec
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Tue Dec 19 11:25:26 2017 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Tue Dec 19 15:14:22 2017 -0800
----------------------------------------------------------------------
src/resource_provider/storage/provider.cpp | 137 +++++++++++++++---------
1 file changed, 86 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/db4a6a55/src/resource_provider/storage/provider.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index ab8c711..772952e 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -382,6 +382,11 @@ private:
Future<Resources> getCapacities();
Future<Nothing> _applyOfferOperation(const id::UUID& operationUuid);
+ void dropOfferOperation(
+ const id::UUID& operationUuid,
+ const Option<FrameworkID>& frameworkId,
+ const Option<OfferOperationID>& operationId,
+ const string& message);
Future<vector<ResourceConversion>> applyCreateVolumeOrBlock(
const Resource& resource,
@@ -1359,47 +1364,58 @@ void StorageLocalResourceProviderProcess::applyOfferOperation(
<< "Received " << operation.info().type() << " operation '"
<< operation.info().id() << "' (uuid: " << uuid.get() << ")";
- CHECK(!offerOperations.contains(uuid.get()));
- offerOperations[uuid.get()] = protobuf::createOfferOperation(
- operation.info(),
- protobuf::createOfferOperationStatus(
- OFFER_OPERATION_PENDING,
- operation.info().has_id()
- ? operation.info().id() : Option<OfferOperationID>::none()),
- operation.has_framework_id()
- ? operation.framework_id() : Option<FrameworkID>::none(),
- slaveId,
- uuid.get());
+ Option<FrameworkID> frameworkId = operation.has_framework_id()
+ ? operation.framework_id() : Option<FrameworkID>::none();
+ Option<OfferOperationID> operationId = operation.info().has_id()
+ ? operation.info().id() : Option<OfferOperationID>::none();
- checkpointResourceProviderState();
+ if (state == SUBSCRIBED) {
+ return dropOfferOperation(
+ uuid.get(),
+ frameworkId,
+ operationId,
+ "Cannot apply offer operation in SUBSCRIBED state");
+ }
- Future<Nothing> result;
+ if (reconciling) {
+ return dropOfferOperation(
+ uuid.get(),
+ frameworkId,
+ operationId,
+ "Cannot apply offer operation when reconciling storage pools");
+ }
Try<id::UUID> operationVersion =
id::UUID::fromBytes(operation.resource_version_uuid().value());
-
CHECK_SOME(operationVersion);
- if (state == SUBSCRIBED) {
- result = updateOfferOperationStatus(uuid.get(), Error(
- "Cannot apply offer operation in SUBSCRIBED state"));
- } else if (reconciling) {
- result = updateOfferOperationStatus(uuid.get(), Error(
- "Cannot apply offer operation when reconciling storage pools"));
- } else if (operationVersion.get() != resourceVersion) {
- result = updateOfferOperationStatus(uuid.get(), Error(
+ if (operationVersion.get() != resourceVersion) {
+ return dropOfferOperation(
+ uuid.get(),
+ frameworkId,
+ operationId,
"Mismatched resource version " + stringify(operationVersion.get()) +
- " (expected: " + stringify(resourceVersion) + ")"));
- } else {
- result = _applyOfferOperation(uuid.get());
+ " (expected: " + stringify(resourceVersion) + ")");
}
+ CHECK(!offerOperations.contains(uuid.get()));
+ offerOperations[uuid.get()] = protobuf::createOfferOperation(
+ operation.info(),
+ protobuf::createOfferOperationStatus(
+ OFFER_OPERATION_PENDING,
+ operationId),
+ frameworkId,
+ slaveId,
+ uuid.get());
+
+ checkpointResourceProviderState();
+
auto err = [](const id::UUID& uuid, const string& message) {
LOG(ERROR)
<< "Failed to apply offer operation (uuid: " << uuid << "): " << message;
};
- result
+ _applyOfferOperation(uuid.get())
.onFailed(std::bind(err, uuid.get(), lambda::_1))
.onDiscarded(std::bind(err, uuid.get(), "future discarded"));
}
@@ -1612,29 +1628,11 @@ void StorageLocalResourceProviderProcess::reconcileOfferOperations(
continue;
}
- OfferOperationStatusUpdate update =
- protobuf::createOfferOperationStatusUpdate(
- uuid.get(),
- protobuf::createOfferOperationStatus(
- OFFER_OPERATION_DROPPED,
- None(),
- None(),
- None(),
- id::UUID::random()),
- None(),
- None(),
- slaveId);
-
- auto die = [=](const string& message) {
- LOG(ERROR)
- << "Failed to update status of offer operation (uuid: " << uuid.get()
- << "): " << message;
- fatal();
- };
-
- statusUpdateManager.update(std::move(update), false)
- .onFailed(defer(self(), std::bind(die, lambda::_1)))
- .onDiscarded(defer(self(), std::bind(die, "future discarded")));
+ dropOfferOperation(
+ uuid.get(),
+ None(),
+ None(),
+ "Unknown offer operation");
}
}
@@ -2551,7 +2549,7 @@ Future<Resources> StorageLocalResourceProviderProcess::getCapacities()
}
-// Applies the offer operation. Conventional operations will be
+// Applies the offer operation. Speculative operations will be
// synchronously applied. Do nothing if the operation is already in a
// terminal state.
Future<Nothing> StorageLocalResourceProviderProcess::_applyOfferOperation(
@@ -2570,7 +2568,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::_applyOfferOperation(
case Offer::Operation::UNRESERVE:
case Offer::Operation::CREATE:
case Offer::Operation::DESTROY: {
- // Synchronously apply the conventional operations to ensure that
+ // Synchronously apply the speculative operations to ensure that
// its result is reflected in the total resources before any of
// its succeeding operations is applied.
return updateOfferOperationStatus(
@@ -2660,6 +2658,43 @@ Future<Nothing> StorageLocalResourceProviderProcess::_applyOfferOperation(
}
+// Sends `OFFER_OPERATION_DROPPED` without checkpointing the status of
+// the offer operation.
+void StorageLocalResourceProviderProcess::dropOfferOperation(
+ const id::UUID& operationUuid,
+ const Option<FrameworkID>& frameworkId,
+ const Option<OfferOperationID>& operationId,
+ const string& message)
+{
+ LOG(WARNING)
+ << "Dropping offer operation (uuid: " << operationUuid << "): " << message;
+
+ OfferOperationStatusUpdate update =
+ protobuf::createOfferOperationStatusUpdate(
+ operationUuid,
+ protobuf::createOfferOperationStatus(
+ OFFER_OPERATION_DROPPED,
+ operationId,
+ message,
+ None(),
+ id::UUID::random()),
+ None(),
+ frameworkId,
+ slaveId);
+
+ auto die = [=](const string& message) {
+ LOG(ERROR)
+ << "Failed to update status of offer operation (uuid: " << operationUuid
+ << "): " << message;
+ fatal();
+ };
+
+ statusUpdateManager.update(std::move(update), false)
+ .onFailed(defer(self(), std::bind(die, lambda::_1)))
+ .onDiscarded(defer(self(), std::bind(die, "future discarded")));
+}
+
+
Future<vector<ResourceConversion>>
StorageLocalResourceProviderProcess::applyCreateVolumeOrBlock(
const Resource& resource,