You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2017/11/06 23:06:51 UTC

[01/14] mesos git commit: Updated the offer operation bookeeping in the master.

Repository: mesos
Updated Branches:
  refs/heads/master a94e3cd0f -> 2619824c9


Updated the offer operation bookeeping in the master.

(This is based on https://reviews.apache.org/r/63539)

Review: https://reviews.apache.org/r/63588


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0a30e9e4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0a30e9e4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0a30e9e4

Branch: refs/heads/master
Commit: 0a30e9e49b8f71b90602b8d3f6fb8494ce467c43
Parents: 3245ed5
Author: Jan Schlicht <ja...@mesosphere.io>
Authored: Fri Nov 3 15:27:25 2017 +0100
Committer: Jie Yu <yu...@gmail.com>
Committed: Mon Nov 6 14:37:26 2017 -0800

----------------------------------------------------------------------
 include/mesos/mesos.proto    |  11 +-
 include/mesos/v1/mesos.proto |  11 +-
 src/master/master.cpp        | 309 +++++++++++++++++++++++++++++++-------
 src/master/master.hpp        | 168 ++++++++++++++++++++-
 src/messages/messages.proto  |   4 +-
 5 files changed, 432 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/0a30e9e4/include/mesos/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index 26e9b6c..1f0b149 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -2181,19 +2181,20 @@ message TaskResourceLimitation {
  * some additional information.
  */
 message OfferOperation {
-  required FrameworkID framework_id = 1;
-  required Offer.Operation info = 2;
-  required OfferOperationStatus latest_status = 3;
+  optional FrameworkID framework_id = 1;
+  optional SlaveID slave_id = 2;
+  required Offer.Operation info = 3;
+  required OfferOperationStatus latest_status = 4;
 
   // All the statuses known to this offer operation. Some of the
   // statuses in this list might not have been acknowledged yet. The
   // statuses are ordered.
-  repeated OfferOperationStatus statuses = 4;
+  repeated OfferOperationStatus statuses = 5;
 
   // This is the internal UUID for the operation, which is kept
   // independently from the framework-specified operation ID, which is
   // optional.
-  required bytes operation_uuid = 5;
+  required bytes operation_uuid = 6;
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/0a30e9e4/include/mesos/v1/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/v1/mesos.proto b/include/mesos/v1/mesos.proto
index 7770b0a..6600635 100644
--- a/include/mesos/v1/mesos.proto
+++ b/include/mesos/v1/mesos.proto
@@ -2162,19 +2162,20 @@ message TaskResourceLimitation {
  * some additional information.
  */
 message OfferOperation {
-  required FrameworkID framework_id = 1;
-  required Offer.Operation info = 2;
-  required OfferOperationStatus latest_status = 3;
+  optional FrameworkID framework_id = 1;
+  optional AgentID agent_id = 2;
+  required Offer.Operation info = 3;
+  required OfferOperationStatus latest_status = 4;
 
   // All the statuses known to this offer operation. Some of the
   // statuses in this list might not have been acknowledged yet. The
   // statuses are ordered.
-  repeated OfferOperationStatus statuses = 4;
+  repeated OfferOperationStatus statuses = 5;
 
   // This is the internal UUID for the operation, which is kept
   // independently from the framework-specified operation ID, which is
   // optional.
-  required bytes operation_uuid = 5;
+  required bytes operation_uuid = 6;
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/0a30e9e4/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 613c9eb..c7aadb1 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -4560,7 +4560,7 @@ void Master::_accept(
                   << operation.reserve().resources() << " from framework "
                   << *framework << " to agent " << *slave;
 
-        _apply(slave, operation);
+        _apply(slave, framework, operation);
 
         conversions.insert(
             conversions.end(),
@@ -4626,7 +4626,7 @@ void Master::_accept(
                   << operation.unreserve().resources() << " from framework "
                   << *framework << " to agent " << *slave;
 
-        _apply(slave, operation);
+        _apply(slave, framework, operation);
 
         conversions.insert(
             conversions.end(),
@@ -4703,7 +4703,7 @@ void Master::_accept(
                   << operation.create().volumes() << " from framework "
                   << *framework << " to agent " << *slave;
 
-        _apply(slave, operation);
+        _apply(slave, framework, operation);
 
         conversions.insert(
             conversions.end(),
@@ -4796,7 +4796,7 @@ void Master::_accept(
                   << operation.destroy().volumes() << " from framework "
                   << *framework << " to agent " << *slave;
 
-        _apply(slave, operation);
+        _apply(slave, framework, operation);
 
         conversions.insert(
             conversions.end(),
@@ -7342,13 +7342,21 @@ void Master::offerOperationStatusUpdate(
     << "External resource provider is not supported yet";
 
   const SlaveID& slaveId = update.slave_id();
-  const FrameworkID& frameworkId = update.framework_id();
+
+  // The status update for the offer operation might be for an
+  // operator API call, thus the framework ID here is optional.
+  Option<FrameworkID> frameworkId = update.has_framework_id()
+    ? update.framework_id()
+    : Option<FrameworkID>::none();
 
   Try<UUID> uuid = UUID::fromString(update.operation_uuid());
   if (uuid.isError()) {
     LOG(ERROR) << "Failed to parse offer operation UUID for operation "
-               << "'" << update.status().operation_id() << "' "
-               << "from framework " << frameworkId << ": " << uuid.error();
+               << "'" << update.status().operation_id() << "' for "
+               << (frameworkId.isSome()
+                     ? "framework " + stringify(frameworkId.get())
+                     : "an operator API call")
+               << " from agent " << slaveId << ": " << uuid.error();
     return;
   }
 
@@ -7367,10 +7375,12 @@ void Master::offerOperationStatusUpdate(
   // tell the framework that the operation is gone.
   if (slave == nullptr) {
     LOG(WARNING) << "Ignoring status update for offer operation '"
-                 << update.status().operation_id() << "' (uuid: "
-                 << uuid->toString() << ") for framework "
-                 << frameworkId << " because agent "
-                 << slaveId << " is not registered";
+                 << update.status().operation_id()
+                 << "' (uuid: " << uuid->toString() << ") for "
+                 << (frameworkId.isSome()
+                       ? "framework " + stringify(frameworkId.get())
+                       : "an operator API call")
+                 << ": Agent " << slaveId << " is not registered";
     return;
   }
 
@@ -7378,14 +7388,39 @@ void Master::offerOperationStatusUpdate(
   if (operation == nullptr) {
     LOG(ERROR) << "Failed to find the offer operation '"
                << update.status().operation_id() << "' (uuid: "
-               << uuid->toString() << ") from framework "
-               << frameworkId << " on agent " << slaveId;
+               << uuid->toString() << ") for "
+               << (frameworkId.isSome()
+                     ? "framework " + stringify(frameworkId.get())
+                     : "an operator API call")
+               << " on agent " << slaveId;
     return;
   }
 
+  // Forward the status update to the framework if needed.
+  if (frameworkId.isSome()) {
+    Framework* framework = getFramework(frameworkId.get());
+
+    if (framework == nullptr || !framework->connected()) {
+      LOG(WARNING) << "Received status update for offer operation '"
+                   << update.status().operation_id()
+                   << "' (uuid: " << uuid->toString() << ") "
+                   << "for framework " << frameworkId.get()
+                   << ", but the framework is "
+                   << (framework == nullptr ? "unknown" : "disconnected");
+    } else {
+      // TODO(jieyu): Forward the status update to the framework.
+    }
+  }
+
   updateOfferOperation(operation, update);
 
-  // TODO(jieyu): Forward the status update to the framework.
+  // If the operation is terminal and no acknowledgement from the
+  // framework (or the operation API endpoint) is needed, remove the
+  // operation.
+  if (protobuf::isTerminalState(operation->latest_status().state()) &&
+      !operation->info().has_id()) {
+    removeOfferOperation(operation);
+  }
 }
 
 
@@ -9615,12 +9650,14 @@ void Master::addOfferOperation(
     Slave* slave,
     OfferOperation* operation)
 {
-  CHECK_NOTNULL(framework);
-  CHECK_NOTNULL(slave);
   CHECK_NOTNULL(operation);
+  CHECK_NOTNULL(slave);
 
   slave->addOfferOperation(operation);
-  framework->addOfferOperation(operation);
+
+  if (framework != nullptr) {
+    framework->addOfferOperation(operation);
+  }
 }
 
 
@@ -9677,12 +9714,15 @@ void Master::updateOfferOperation(
     return;
   }
 
+  // Update resource accounting in the master and in the allocator.
+  // NOTE: For the "old" operations (RESERVE, UNRESERVE, CREATE,
+  // DESTROY), the master speculatively assumes that the operation
+  // will be successful when it accepts the operations. Therefore, we
+  // don't need to update the resource accounting for those types of
+  // offer operations in the master and in the allocator states upon
+  // receiving a terminal status update.
   Resource consumed;
 
-  // For the following "old" operations, the master speculatively
-  // assumes that the operation will be successful when it accepts
-  // the operations. Therefore, we don't need to update the master and
-  // the allocator states upon receiving a terminal status update.
   switch (operation->info().type()) {
     case Offer::Operation::LAUNCH:
       LOG(FATAL) << "Unexpected LAUNCH operation";
@@ -9712,9 +9752,9 @@ void Master::updateOfferOperation(
       return;
   }
 
-  CHECK(update.has_slave_id());
+  CHECK(operation->has_slave_id())
+    << "External resource provider is not supported yet";
 
-  // Update the master and
   switch (operation->latest_status().state()) {
     // Terminal state, and the conversion is successful.
     case OFFER_OPERATION_FINISHED: {
@@ -9723,13 +9763,13 @@ void Master::updateOfferOperation(
 
       allocator->updateAllocation(
           operation->framework_id(),
-          update.slave_id(),
+          operation->slave_id(),
           consumed,
           {ResourceConversion(consumed, converted)});
 
       allocator->recoverResources(
           operation->framework_id(),
-          update.slave_id(),
+          operation->slave_id(),
           converted,
           None());
 
@@ -9741,7 +9781,7 @@ void Master::updateOfferOperation(
     case OFFER_OPERATION_ERROR: {
       allocator->recoverResources(
           operation->framework_id(),
-          update.slave_id(),
+          operation->slave_id(),
           consumed,
           None());
 
@@ -9757,6 +9797,51 @@ void Master::updateOfferOperation(
       break;
     }
   }
+
+  // The slave owns the OfferOperation object and cannot be nullptr.
+  // TODO(jieyu): Revisit this once we introduce support for external
+  // resource provider.
+  Slave* slave = slaves.registered.get(operation->slave_id());
+  CHECK_NOTNULL(slave);
+
+  slave->recoverResources(operation);
+
+  Framework* framework = operation->has_framework_id()
+    ? getFramework(operation->framework_id())
+    : nullptr;
+
+  if (framework != nullptr) {
+    framework->recoverResources(operation);
+  }
+}
+
+
+void Master::removeOfferOperation(OfferOperation* operation)
+{
+  CHECK_NOTNULL(operation);
+
+  CHECK(protobuf::isTerminalState(operation->latest_status().state()))
+    << operation->latest_status().state();
+
+  // Remove from framework.
+  Framework* framework = operation->has_framework_id()
+    ? getFramework(operation->framework_id())
+    : nullptr;
+
+  if (framework != nullptr) {
+    framework->removeOfferOperation(operation);
+  }
+
+  // Remove from slave.
+  CHECK(operation->has_slave_id())
+    << "External resource provider is not supported yet";
+
+  Slave* slave = slaves.registered.get(operation->slave_id());
+  CHECK_NOTNULL(slave);
+
+  slave->removeOfferOperation(operation);
+
+  delete operation;
 }
 
 
@@ -9765,45 +9850,71 @@ Future<Nothing> Master::apply(Slave* slave, const Offer::Operation& operation)
   CHECK_NOTNULL(slave);
 
   return allocator->updateAvailable(slave->id, {operation})
-    .onReady(defer(self(), &Master::_apply, slave, operation));
+    .onReady(defer(self(), &Master::_apply, slave, nullptr, operation));
 }
 
 
-void Master::_apply(Slave* slave, const Offer::Operation& operation)
+void Master::_apply(
+    Slave* slave,
+    Framework* framework,
+    const Offer::Operation& operation)
 {
   CHECK_NOTNULL(slave);
 
   slave->apply(operation);
 
-  CheckpointResourcesMessage message;
+  if (slave->capabilities.resourceProvider) {
+    OfferOperation* offerOperation = new OfferOperation(
+        protobuf::createOfferOperation(
+            operation,
+            protobuf::createOfferOperationStatus(OFFER_OPERATION_PENDING),
+            framework->id()));
 
-  message.mutable_resources()->CopyFrom(slave->checkpointedResources);
+    addOfferOperation(framework, slave, offerOperation);
 
-  if (!slave->capabilities.reservationRefinement) {
-    // If the agent is not refinement-capable, don't send it
-    // checkpointed resources that contain refined reservations. This
-    // might occur if a reservation refinement is created but never
-    // reaches the agent (e.g., due to network partition), and then
-    // the agent is downgraded before the partition heals.
-    //
-    // TODO(neilc): It would probably be better to prevent the agent
-    // from re-registering in this scenario.
-    Try<Nothing> result = downgradeResources(message.mutable_resources());
-    if (result.isError()) {
-      LOG(WARNING) << "Not sending updated checkpointed resouces "
-                   << slave->checkpointedResources
-                   << " with refined reservations, since agent " << *slave
-                   << " is not RESERVATION_REFINEMENT-capable.";
+    ApplyOfferOperationMessage message;
+    if (framework != nullptr) {
+      message.mutable_framework_id()->CopyFrom(framework->id());
+    }
+    message.mutable_operation_info()->CopyFrom(offerOperation->info());
+    message.set_operation_uuid(offerOperation->operation_uuid());
 
-      return;
+    LOG(INFO) << "Sending offer operation "
+              << offerOperation->operation_uuid()
+              << " to agent " << *slave;
+
+    send(slave->pid, message);
+  } else {
+    CheckpointResourcesMessage message;
+
+    message.mutable_resources()->CopyFrom(slave->checkpointedResources);
+
+    if (!slave->capabilities.reservationRefinement) {
+      // If the agent is not refinement-capable, don't send it
+      // checkpointed resources that contain refined reservations. This
+      // might occur if a reservation refinement is created but never
+      // reaches the agent (e.g., due to network partition), and then
+      // the agent is downgraded before the partition heals.
+      //
+      // TODO(neilc): It would probably be better to prevent the agent
+      // from re-registering in this scenario.
+      Try<Nothing> result = downgradeResources(message.mutable_resources());
+      if (result.isError()) {
+        LOG(WARNING) << "Not sending updated checkpointed resources "
+                     << slave->checkpointedResources
+                     << " with refined reservations, since agent " << *slave
+                     << " is not RESERVATION_REFINEMENT-capable.";
+
+        return;
+      }
     }
-  }
 
-  LOG(INFO) << "Sending updated checkpointed resources "
-            << slave->checkpointedResources
-            << " to agent " << *slave;
+    LOG(INFO) << "Sending updated checkpointed resources "
+              << slave->checkpointedResources
+              << " to agent " << *slave;
 
-  send(slave->pid, message);
+    send(slave->pid, message);
+  }
 }
 
 
@@ -10651,6 +10762,102 @@ void Slave::addOfferOperation(OfferOperation* operation)
   CHECK_SOME(uuid);
 
   offerOperations.put(uuid.get(), operation);
+
+  Resource consumed;
+  switch (operation->info().type()) {
+    case Offer::Operation::LAUNCH:
+    case Offer::Operation::LAUNCH_GROUP:
+    case Offer::Operation::RESERVE:
+    case Offer::Operation::UNRESERVE:
+    case Offer::Operation::CREATE:
+    case Offer::Operation::DESTROY:
+      // These operations are speculatively applied and not
+      // tracked as used resources.
+      return;
+    case Offer::Operation::CREATE_VOLUME:
+      consumed = operation->info().create_volume().source();
+      break;
+    case Offer::Operation::DESTROY_VOLUME:
+      consumed = operation->info().destroy_volume().volume();
+      break;
+    case Offer::Operation::CREATE_BLOCK:
+      consumed = operation->info().create_block().source();
+      break;
+    case Offer::Operation::DESTROY_BLOCK:
+      consumed = operation->info().destroy_block().block();
+      break;
+    case Offer::Operation::UNKNOWN:
+      LOG(WARNING) << "Ignoring unknown offer operation";
+      return;
+  }
+
+  usedResources[operation->framework_id()] += consumed;
+}
+
+
+void Slave::recoverResources(OfferOperation* operation)
+{
+  // TODO(jieyu): Currently, we do not keep track of used resources
+  // for offer operations that are created by the operator through the
+  // operator API endpoint.
+  if (!operation->has_framework_id()) {
+    return;
+  }
+
+  const FrameworkID& frameworkId = operation->framework_id();
+
+  Resource consumed;
+  switch (operation->info().type()) {
+    case Offer::Operation::LAUNCH:
+    case Offer::Operation::LAUNCH_GROUP:
+    case Offer::Operation::RESERVE:
+    case Offer::Operation::UNRESERVE:
+    case Offer::Operation::CREATE:
+    case Offer::Operation::DESTROY:
+      // These operations are speculatively applied and not
+      // tracked as used resources.
+      return;
+    case Offer::Operation::CREATE_VOLUME:
+      consumed = operation->info().create_volume().source();
+      break;
+    case Offer::Operation::DESTROY_VOLUME:
+      consumed = operation->info().destroy_volume().volume();
+      break;
+    case Offer::Operation::CREATE_BLOCK:
+      consumed = operation->info().create_block().source();
+      break;
+    case Offer::Operation::DESTROY_BLOCK:
+      consumed = operation->info().destroy_block().block();
+      break;
+    case Offer::Operation::UNKNOWN: {
+      LOG(ERROR) << "Unknown offer operation";
+      return;
+    }
+  }
+
+  CHECK(usedResources[frameworkId].contains(consumed))
+    << "Unknown resources " << consumed << " of framework " << frameworkId;
+
+  usedResources[frameworkId] -= consumed;
+  if (usedResources[frameworkId].empty()) {
+    usedResources.erase(frameworkId);
+  }
+}
+
+
+void Slave::removeOfferOperation(OfferOperation* operation)
+{
+  Try<UUID> uuid = UUID::fromBytes(operation->operation_uuid());
+  CHECK_SOME(uuid);
+
+  CHECK(offerOperations.contains(uuid.get()))
+    << "Unknown offer operation (uuid: " << uuid->toString() << ")"
+    << " to agent " << *this;
+
+  CHECK(protobuf::isTerminalState(operation->latest_status().state()))
+    << operation->latest_status().state();
+
+  offerOperations.erase(uuid.get());
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/0a30e9e4/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 904525d..0c1253a 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -147,6 +147,11 @@ struct Slave
   void removeTask(Task* task);
 
   void addOfferOperation(OfferOperation* operation);
+
+  void recoverResources(OfferOperation* operation);
+
+  void removeOfferOperation(OfferOperation* operation);
+
   OfferOperation* getOfferOperation(const UUID& uuid) const;
 
   void addOffer(Offer* offer);
@@ -241,9 +246,9 @@ struct Slave
   // Active inverse offers on this slave.
   hashset<InverseOffer*> inverseOffers;
 
-  // Resources for active task / executors. Note that we maintain multiple
-  // copies of each shared resource in `usedResources` as they are used by
-  // multiple tasks.
+  // Resources for active task / executors / offer operations.
+  // Note that we maintain multiple copies of each shared resource in
+  // `usedResources` as they are used by multiple tasks.
   hashmap<FrameworkID, Resources> usedResources;
 
   Resources offeredResources; // Offers.
@@ -883,6 +888,9 @@ protected:
       OfferOperation* operation,
       const OfferOperationStatusUpdate& update);
 
+  // Remove the offer operation.
+  void removeOfferOperation(OfferOperation* operation);
+
   // Attempts to update the allocator by applying the given operation.
   // If successful, updates the slave's resources, sends a
   // 'CheckpointResourcesMessage' to the slave with the updated
@@ -921,10 +929,15 @@ protected:
   SlaveID newSlaveId();
 
 private:
-  // Updates the slave's resources by applying the given operation.
-  // It also sends a 'CheckpointResourcesMessage' to the slave with
-  // the updated checkpointed resources.
-  void _apply(Slave* slave, const Offer::Operation& operation);
+  // Updates the agent's resources by applying the given operation.
+  // Sends either `ApplyOfferOperationMessage` or
+  // `CheckpointResourcesMessage` (with updated checkpointed
+  // resources) to the agent depending on if the agent has
+  // `RESOURCE_PROVIDER` capability.
+  void _apply(
+      Slave* slave,
+      Framework* framework,
+      const Offer::Operation& operation);
 
   void drop(
       const process::UPID& from,
@@ -2752,14 +2765,153 @@ struct Framework
 
   void addOfferOperation(OfferOperation* operation)
   {
+    CHECK(operation->has_framework_id());
+
+    const FrameworkID& frameworkId = operation->framework_id();
+
     Try<UUID> uuid = UUID::fromBytes(operation->operation_uuid());
     CHECK_SOME(uuid);
 
+    CHECK(!offerOperations.contains(uuid.get()))
+      << "Duplicate offer operation '" << operation->info().id()
+      << "' (uuid: " << uuid->toString() << ") "
+      << "of framework " << frameworkId;
+
     offerOperations.put(uuid.get(), operation);
 
     if (operation->info().has_id()) {
       offerOperationUUIDs.put(operation->info().id(), uuid.get());
     }
+
+    if (!protobuf::isTerminalState(operation->latest_status().state())) {
+      Resource consumed;
+      switch (operation->info().type()) {
+        case Offer::Operation::LAUNCH:
+        case Offer::Operation::LAUNCH_GROUP:
+        case Offer::Operation::RESERVE:
+        case Offer::Operation::UNRESERVE:
+        case Offer::Operation::CREATE:
+        case Offer::Operation::DESTROY:
+          // These operations are speculatively applied and not
+          // tracked as used resources.
+          return;
+        case Offer::Operation::CREATE_VOLUME:
+          consumed = operation->info().create_volume().source();
+          break;
+        case Offer::Operation::DESTROY_VOLUME:
+          consumed = operation->info().destroy_volume().volume();
+          break;
+        case Offer::Operation::CREATE_BLOCK:
+          consumed = operation->info().create_block().source();
+          break;
+        case Offer::Operation::DESTROY_BLOCK:
+          consumed = operation->info().destroy_block().block();
+          break;
+        case Offer::Operation::UNKNOWN:
+          LOG(ERROR) << "Unknown offer operation";
+          return;
+      }
+
+      CHECK(operation->has_slave_id())
+        << "External resource provider is not supported yet";
+
+      const SlaveID& slaveId = operation->slave_id();
+
+      totalUsedResources += consumed;
+      usedResources[slaveId] += consumed;
+
+      // It's possible that we're not tracking the role from the
+      // resources in the offer operation for this framework if the
+      // role is absent from the framework's set of roles. In this
+      // case, we track the role's allocation for this framework.
+      const std::string& role = consumed.allocation_info().role();
+
+      if (!isTrackedUnderRole(role)) {
+        trackUnderRole(role);
+      }
+    }
+  }
+
+  void recoverResources(OfferOperation* operation)
+  {
+    CHECK(operation->has_slave_id())
+      << "External resource provider is not supported yet";
+
+    const SlaveID& slaveId = operation->slave_id();
+
+    Resource consumed;
+    switch (operation->info().type()) {
+      case Offer::Operation::LAUNCH:
+      case Offer::Operation::LAUNCH_GROUP:
+      case Offer::Operation::RESERVE:
+      case Offer::Operation::UNRESERVE:
+      case Offer::Operation::CREATE:
+      case Offer::Operation::DESTROY:
+        // These operations are speculatively applied and not
+        // tracked as used resources.
+        return;
+      case Offer::Operation::CREATE_VOLUME:
+        consumed = operation->info().create_volume().source();
+        break;
+      case Offer::Operation::DESTROY_VOLUME:
+        consumed = operation->info().destroy_volume().volume();
+        break;
+      case Offer::Operation::CREATE_BLOCK:
+        consumed = operation->info().create_block().source();
+        break;
+      case Offer::Operation::DESTROY_BLOCK:
+        consumed = operation->info().destroy_block().block();
+        break;
+      case Offer::Operation::UNKNOWN:
+        LOG(WARNING) << "Ignoring unknown offer operation";
+        return;
+    }
+
+    CHECK(totalUsedResources.contains(consumed))
+      << "Tried to recover resources " << consumed
+      << " which do not seem used";
+
+    CHECK(usedResources[slaveId].contains(consumed))
+      << "Tried to recover resources " << consumed << " of agent "
+      << slaveId << " which do not seem used";
+
+    totalUsedResources -= consumed;
+    usedResources[slaveId] -= consumed;
+    if (usedResources[slaveId].empty()) {
+      usedResources.erase(slaveId);
+    }
+
+    // If we are no longer subscribed to the role to which these
+    // resources are being returned to, and we have no more resources
+    // allocated to us for that role, stop tracking the framework
+    // under the role.
+    const std::string& role = consumed.allocation_info().role();
+
+    auto allocatedToRole = [&role](const Resource& resource) {
+      return resource.allocation_info().role() == role;
+    };
+
+    if (roles.count(role) == 0 &&
+        totalUsedResources.filter(allocatedToRole).empty()) {
+      CHECK(totalOfferedResources.filter(allocatedToRole).empty());
+      untrackUnderRole(role);
+    }
+  }
+
+  void removeOfferOperation(OfferOperation* operation)
+  {
+    Try<UUID> uuid = UUID::fromBytes(operation->operation_uuid());
+    CHECK_SOME(uuid);
+
+    CHECK(offerOperations.contains(uuid.get()))
+      << "Unknown offer operation '" << operation->info().id()
+      << "' (uuid: " << uuid->toString() << ") "
+      << "of framework " << operation->framework_id();
+
+    CHECK(protobuf::isTerminalState(operation->latest_status().state()))
+      << operation->latest_status().state();
+
+    offerOperations.erase(uuid.get());
   }
 
   const FrameworkID id() const { return info.id(); }
@@ -3047,7 +3199,7 @@ struct Framework
   // TODO(mpark): Strip the non-scalar resources out of the totals
   // in order to avoid reporting incorrect statistics (MESOS-2623).
 
-  // Active task / executor resources.
+  // Active task / executor / offer operation resources.
   Resources totalUsedResources;
 
   // Note that we maintain multiple copies of each shared resource in

http://git-wip-us.apache.org/repos/asf/mesos/blob/0a30e9e4/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index 3244c1f..0cc6b40 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -694,7 +694,7 @@ message UpdateSlaveMessage {
  * See resource_provider::Call::UPDATE_OFFER_OPERATION_STATUS.
  */
 message OfferOperationStatusUpdate {
-  required FrameworkID framework_id = 1;
+  optional FrameworkID framework_id = 1;
   optional SlaveID slave_id = 2;
   required OfferOperationStatus status = 3;
   optional OfferOperationStatus latest_status = 4;
@@ -715,7 +715,7 @@ message OfferOperationStatusUpdate {
  * See resource_provider::Event::OPERATION.
  */
 message ApplyOfferOperationMessage {
-  required FrameworkID framework_id = 1;
+  optional FrameworkID framework_id = 1;
   required Offer.Operation operation_info = 2;
 
   // This is the internal UUID for the operation, which is kept


[08/14] mesos git commit: Added latest_status field to OfferOperation protobuf.

Posted by ji...@apache.org.
Added latest_status field to OfferOperation protobuf.

Review: https://reviews.apache.org/r/63481


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c9c2a38b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c9c2a38b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c9c2a38b

Branch: refs/heads/master
Commit: c9c2a38bff084027b990c5a0c7fac68bcea09658
Parents: eb5e65c
Author: Jie Yu <yu...@gmail.com>
Authored: Wed Nov 1 12:38:19 2017 +0100
Committer: Jie Yu <yu...@gmail.com>
Committed: Mon Nov 6 14:37:26 2017 -0800

----------------------------------------------------------------------
 include/mesos/mesos.proto    | 5 +++--
 include/mesos/v1/mesos.proto | 5 +++--
 2 files changed, 6 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/c9c2a38b/include/mesos/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index 68a5538..26e9b6c 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -2183,16 +2183,17 @@ message TaskResourceLimitation {
 message OfferOperation {
   required FrameworkID framework_id = 1;
   required Offer.Operation info = 2;
+  required OfferOperationStatus latest_status = 3;
 
   // All the statuses known to this offer operation. Some of the
   // statuses in this list might not have been acknowledged yet. The
   // statuses are ordered.
-  repeated OfferOperationStatus statuses = 3;
+  repeated OfferOperationStatus statuses = 4;
 
   // This is the internal UUID for the operation, which is kept
   // independently from the framework-specified operation ID, which is
   // optional.
-  required bytes operation_uuid = 4;
+  required bytes operation_uuid = 5;
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/c9c2a38b/include/mesos/v1/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/v1/mesos.proto b/include/mesos/v1/mesos.proto
index c46cec7..7770b0a 100644
--- a/include/mesos/v1/mesos.proto
+++ b/include/mesos/v1/mesos.proto
@@ -2164,16 +2164,17 @@ message TaskResourceLimitation {
 message OfferOperation {
   required FrameworkID framework_id = 1;
   required Offer.Operation info = 2;
+  required OfferOperationStatus latest_status = 3;
 
   // All the statuses known to this offer operation. Some of the
   // statuses in this list might not have been acknowledged yet. The
   // statuses are ordered.
-  repeated OfferOperationStatus statuses = 3;
+  repeated OfferOperationStatus statuses = 4;
 
   // This is the internal UUID for the operation, which is kept
   // independently from the framework-specified operation ID, which is
   // optional.
-  required bytes operation_uuid = 4;
+  required bytes operation_uuid = 5;
 }
 
 


[05/14] mesos git commit: Added a helper to get the resoruce provider ID from an offer operation.

Posted by ji...@apache.org.
Added a helper to get the resoruce provider ID from an offer operation.

Review: https://reviews.apache.org/r/63414


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9cdac390
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9cdac390
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9cdac390

Branch: refs/heads/master
Commit: 9cdac390632831777632a0d437a51654ff2f4759
Parents: 88dbbce
Author: Jie Yu <yu...@gmail.com>
Authored: Mon Oct 30 13:56:09 2017 +0100
Committer: Jie Yu <yu...@gmail.com>
Committed: Mon Nov 6 14:37:26 2017 -0800

----------------------------------------------------------------------
 src/common/resources_utils.cpp | 48 +++++++++++++++++++++++++++++++++++++
 src/common/resources_utils.hpp |  9 +++++++
 2 files changed, 57 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/9cdac390/src/common/resources_utils.cpp
----------------------------------------------------------------------
diff --git a/src/common/resources_utils.cpp b/src/common/resources_utils.cpp
index e34cd8a..8304da4 100644
--- a/src/common/resources_utils.cpp
+++ b/src/common/resources_utils.cpp
@@ -91,6 +91,54 @@ Try<Resources> applyCheckpointedResources(
 }
 
 
+Result<ResourceProviderID> getResourceProviderId(
+    const Offer::Operation& operation)
+{
+  Option<Resource> resource;
+
+  switch (operation.type()) {
+    case Offer::Operation::LAUNCH:
+      return Error("Unexpected LAUNCH operation");
+    case Offer::Operation::LAUNCH_GROUP:
+      return Error("Unexpected LAUNCH_GROUP operation");
+    case Offer::Operation::RESERVE:
+      resource = operation.reserve().resources(0);
+      break;
+    case Offer::Operation::UNRESERVE:
+      resource = operation.unreserve().resources(0);
+      break;
+    case Offer::Operation::CREATE:
+      resource = operation.create().volumes(0);
+      break;
+    case Offer::Operation::DESTROY:
+      resource = operation.destroy().volumes(0);
+      break;
+    case Offer::Operation::CREATE_VOLUME:
+      resource = operation.create_volume().source();
+      break;
+    case Offer::Operation::DESTROY_VOLUME:
+      resource = operation.destroy_volume().volume();
+      break;
+    case Offer::Operation::CREATE_BLOCK:
+      resource = operation.create_block().source();
+      break;
+    case Offer::Operation::DESTROY_BLOCK:
+      resource = operation.destroy_block().block();
+      break;
+    case Offer::Operation::UNKNOWN:
+      return Error("Unknown offer operation");
+  }
+
+  CHECK_SOME(resource);
+
+  if (resource->has_provider_id()) {
+    return resource->provider_id();
+  }
+
+  return None();
+}
+
+
 void convertResourceFormat(Resource* resource, ResourceFormat format)
 {
   switch (format) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/9cdac390/src/common/resources_utils.hpp
----------------------------------------------------------------------
diff --git a/src/common/resources_utils.hpp b/src/common/resources_utils.hpp
index 18e3d9d..b8c29b3 100644
--- a/src/common/resources_utils.hpp
+++ b/src/common/resources_utils.hpp
@@ -46,6 +46,15 @@ Try<Resources> applyCheckpointedResources(
     const Resources& checkpointedResources);
 
 
+// Returns the resource provider ID associated with the given
+// operation. Returns None() if the operation is for agent default
+// resources. We assume the given operation is validated. Therefore,
+// the specified operation should not contain resources from more than
+// one resource provider.
+Result<ResourceProviderID> getResourceProviderId(
+    const Offer::Operation& operation);
+
+
 // Resource format options to be used with the `convertResourceFormat` function.
 //
 // The preconditions of the options are asymmetric, centered around the


[02/14] mesos git commit: Implemented updateOfferOperation in the master.

Posted by ji...@apache.org.
Implemented updateOfferOperation in the master.

Review: https://reviews.apache.org/r/63592


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3245ed59
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3245ed59
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3245ed59

Branch: refs/heads/master
Commit: 3245ed5943f4dea870bac8a997757c1f98828aa1
Parents: d73a616
Author: Jan Schlicht <ja...@mesosphere.io>
Authored: Fri Nov 3 11:58:45 2017 +0100
Committer: Jie Yu <yu...@gmail.com>
Committed: Mon Nov 6 14:37:26 2017 -0800

----------------------------------------------------------------------
 src/master/master.cpp | 131 ++++++++++++++++++++++++++++++++++++++++++++-
 src/master/master.hpp |   2 +-
 2 files changed, 131 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/3245ed59/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 2d340c4..613c9eb 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -9626,8 +9626,137 @@ void Master::addOfferOperation(
 
 void Master::updateOfferOperation(
     OfferOperation* operation,
-    OfferOperationStatusUpdate update)
+    const OfferOperationStatusUpdate& update)
 {
+  CHECK_NOTNULL(operation);
+
+  const OfferOperationStatus& status = update.status();
+
+  Option<OfferOperationStatus> latestStatus;
+  if (update.has_latest_status()) {
+    latestStatus = update.latest_status();
+  }
+
+  // Whether the offer operation has just become terminated.
+  Option<bool> terminated;
+
+  if (latestStatus.isSome()) {
+    terminated =
+      !protobuf::isTerminalState(operation->latest_status().state()) &&
+      protobuf::isTerminalState(latestStatus->state());
+
+    // If the operation has already transitioned to a terminal state,
+    // do not update its state.
+    if (!protobuf::isTerminalState(operation->latest_status().state())) {
+      operation->mutable_latest_status()->CopyFrom(latestStatus.get());
+    }
+  } else {
+    terminated =
+      !protobuf::isTerminalState(operation->latest_status().state()) &&
+      protobuf::isTerminalState(status.state());
+
+    if (!protobuf::isTerminalState(operation->latest_status().state())) {
+      operation->mutable_latest_status()->CopyFrom(status);
+    }
+  }
+
+  operation->add_statuses()->CopyFrom(status);
+
+  Try<UUID> uuid = UUID::fromBytes(update.operation_uuid());
+  CHECK_SOME(uuid);
+
+  LOG(INFO) << "Updating the state of offer operation '"
+            << operation->info().id() << "' (uuid: " << uuid->toString()
+            << ") of framework " << operation->framework_id()
+            << " (latest state: " << operation->latest_status().state()
+            << ", status update state: " << status.state() << ")";
+
+  CHECK_SOME(terminated);
+
+  if (!terminated.get()) {
+    return;
+  }
+
+  Resource consumed;
+
+  // For the following "old" operations, the master speculatively
+  // assumes that the operation will be successful when it accepts
+  // the operations. Therefore, we don't need to update the master and
+  // the allocator states upon receiving a terminal status update.
+  switch (operation->info().type()) {
+    case Offer::Operation::LAUNCH:
+      LOG(FATAL) << "Unexpected LAUNCH operation";
+      break;
+    case Offer::Operation::LAUNCH_GROUP:
+      LOG(FATAL) << "Unexpected LAUNCH_GROUP operation";
+      break;
+    case Offer::Operation::RESERVE:
+    case Offer::Operation::UNRESERVE:
+    case Offer::Operation::CREATE:
+    case Offer::Operation::DESTROY:
+      return;
+    case Offer::Operation::CREATE_VOLUME:
+      consumed = operation->info().create_volume().source();
+      break;
+    case Offer::Operation::DESTROY_VOLUME:
+      consumed = operation->info().destroy_volume().volume();
+      break;
+    case Offer::Operation::CREATE_BLOCK:
+      consumed = operation->info().create_block().source();
+      break;
+    case Offer::Operation::DESTROY_BLOCK:
+      consumed = operation->info().destroy_block().block();
+      break;
+    case Offer::Operation::UNKNOWN:
+      LOG(ERROR) << "Unknown offer operation";
+      return;
+  }
+
+  CHECK(update.has_slave_id());
+
+  // Update the master and
+  switch (operation->latest_status().state()) {
+    // Terminal state, and the conversion is successful.
+    case OFFER_OPERATION_FINISHED: {
+      const Resources converted =
+        operation->latest_status().converted_resources();
+
+      allocator->updateAllocation(
+          operation->framework_id(),
+          update.slave_id(),
+          consumed,
+          {ResourceConversion(consumed, converted)});
+
+      allocator->recoverResources(
+          operation->framework_id(),
+          update.slave_id(),
+          converted,
+          None());
+
+      break;
+    }
+
+    // Terminal state, and the conversion has failed.
+    case OFFER_OPERATION_FAILED:
+    case OFFER_OPERATION_ERROR: {
+      allocator->recoverResources(
+          operation->framework_id(),
+          update.slave_id(),
+          consumed,
+          None());
+
+      break;
+    }
+
+    // Non-terminal. This shouldn't happen.
+    case OFFER_OPERATION_PENDING:
+    case OFFER_OPERATION_UNSUPPORTED: {
+      LOG(FATAL) << "Unexpected offer operation state "
+                 << operation->latest_status().state();
+
+      break;
+    }
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/3245ed59/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 4c18258..904525d 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -881,7 +881,7 @@ protected:
   // offer operation becomes terminal.
   void updateOfferOperation(
       OfferOperation* operation,
-      OfferOperationStatusUpdate update);
+      const OfferOperationStatusUpdate& update);
 
   // Attempts to update the allocator by applying the given operation.
   // If successful, updates the slave's resources, sends a


[06/14] mesos git commit: Added the initial implementation for applying offer operations.

Posted by ji...@apache.org.
Added the initial implementation for applying offer operations.

The resource provider manager provides an `applyOfferOperation` method
for offer operation affecting resource providers. The resources on
which the operation should be applied contains a resource provider ID.
This will be extracted and an event will be sent to the respective
resource provider.

(This is based on https://reviews.apache.org/r/61810)

Review: https://reviews.apache.org/r/63480


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/97062ac8
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/97062ac8
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/97062ac8

Branch: refs/heads/master
Commit: 97062ac861b2642d6a882226b767f3ccd1a3c1db
Parents: 9cdac39
Author: Jan Schlicht <ja...@mesosphere.io>
Authored: Mon Oct 30 13:57:09 2017 +0100
Committer: Jie Yu <yu...@gmail.com>
Committed: Mon Nov 6 14:37:26 2017 -0800

----------------------------------------------------------------------
 src/messages/messages.proto       |   4 +-
 src/resource_provider/manager.cpp | 128 +++++++++++++++++++++++++++------
 src/resource_provider/manager.hpp |   4 ++
 src/slave/slave.cpp               |  34 ++++++++-
 4 files changed, 148 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/97062ac8/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index 2fbca22..1610c2b 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -705,7 +705,9 @@ message OfferOperationStatusUpdate {
 
 
 /**
- * This message is sent from the master to the agent to apply an offer
+ * This message is sent from the master to the resource provider
+ * manager (either on the agent for local resource providers, or on
+ * the master for external resource providers) to apply an offer
  * operation.
  *
  * See resource_provider::Event::OPERATION.

http://git-wip-us.apache.org/repos/asf/mesos/blob/97062ac8/src/resource_provider/manager.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp
index 11f8901..a878507 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -37,6 +37,7 @@
 
 #include "common/http.hpp"
 #include "common/recordio.hpp"
+#include "common/resources_utils.hpp"
 
 #include "internal/devolve.hpp"
 #include "internal/evolve.hpp"
@@ -52,7 +53,6 @@ using mesos::resource_provider::Event;
 
 using process::Failure;
 using process::Future;
-using process::Owned;
 using process::Process;
 using process::ProcessBase;
 using process::Queue;
@@ -140,9 +140,9 @@ public:
       const http::Request& request,
       const Option<Principal>& principal);
 
-  Queue<ResourceProviderMessage> messages;
+  void applyOfferOperation(const ApplyOfferOperationMessage& message);
 
-  hashmap<ResourceProviderID, ResourceProvider> resourceProviders;
+  Queue<ResourceProviderMessage> messages;
 
 private:
   void subscribe(
@@ -158,6 +158,11 @@ private:
       const Call::UpdateState& update);
 
   ResourceProviderID newResourceProviderId();
+
+  struct ResourceProviders
+  {
+    hashmap<ResourceProviderID, ResourceProvider> subscribed;
+  } resourceProviders;
 };
 
 
@@ -254,11 +259,12 @@ Future<http::Response> ResourceProviderManagerProcess::api(
     return ok;
   }
 
-  if (!resourceProviders.contains(call.resource_provider_id())) {
-    return BadRequest("Resource provider cannot be found");
+  if (!resourceProviders.subscribed.contains(call.resource_provider_id())) {
+    return BadRequest("Resource provider is not subscribed");
   }
 
-  auto resourceProvider = resourceProviders.at(call.resource_provider_id());
+  ResourceProvider& resourceProvider =
+    resourceProviders.subscribed.at(call.resource_provider_id());
 
   // This isn't a `SUBSCRIBE` call, so the request should include a stream ID.
   if (!request.headers.contains("Mesos-Stream-Id")) {
@@ -302,6 +308,69 @@ Future<http::Response> ResourceProviderManagerProcess::api(
 }
 
 
+void ResourceProviderManagerProcess::applyOfferOperation(
+    const ApplyOfferOperationMessage& message)
+{
+  const Offer::Operation& operation = message.operation_info();
+  const FrameworkID& frameworkId = message.framework_id();
+
+  Try<UUID> uuid = UUID::fromBytes(message.operation_uuid());
+  if (uuid.isError()) {
+    LOG(ERROR) << "Failed to parse offer operation UUID for operation "
+               << "'" << operation.id() << "' from framework "
+               << frameworkId << ": " << uuid.error();
+    return;
+  }
+
+  Result<ResourceProviderID> resourceProviderId =
+    getResourceProviderId(operation);
+
+  if (!resourceProviderId.isSome()) {
+    LOG(ERROR) << "Failed to get the resource provider ID of operation "
+               << "'" << operation.id() << "' (uuid: " << uuid->toString()
+               << ") from framework " << frameworkId << ": "
+               << (resourceProviderId.isError() ? resourceProviderId.error()
+                                                : "Not found");
+    return;
+  }
+
+  if (!resourceProviders.subscribed.contains(resourceProviderId.get())) {
+    LOG(WARNING) << "Dropping operation '" << operation.id() << "' (uuid: "
+                 << uuid.get() << ") from framework " << frameworkId
+                 << " because resource provider " << resourceProviderId.get()
+                 << " is not subscribed";
+    return;
+  }
+
+  ResourceProvider& resourceProvider =
+    resourceProviders.subscribed.at(resourceProviderId.get());
+
+  CHECK(message.resource_version_uuid().has_resource_provider_id());
+
+  CHECK_EQ(message.resource_version_uuid().resource_provider_id(),
+           resourceProviderId.get())
+    << "Resource provider ID "
+    << message.resource_version_uuid().resource_provider_id()
+    << " in resource version UUID does not match that in the operation "
+    << resourceProviderId.get();
+
+  Event event;
+  event.set_type(Event::OPERATION);
+  event.mutable_operation()->mutable_framework_id()->CopyFrom(frameworkId);
+  event.mutable_operation()->mutable_info()->CopyFrom(operation);
+  event.mutable_operation()->set_operation_uuid(message.operation_uuid());
+  event.mutable_operation()->set_resource_version_uuid(
+      message.resource_version_uuid().uuid());
+
+  if (!resourceProvider.http.send(event)) {
+    LOG(WARNING) << "Failed to send operation '" << operation.id() << "' "
+                 << "(uuid: " << uuid.get() << ") from framework "
+                 << frameworkId << " to resource provider "
+                 << resourceProviderId.get() << ": connection closed";
+  }
+}
+
+
 void ResourceProviderManagerProcess::subscribe(
     const HttpConnection& http,
     const Call::Subscribe& subscribe)
@@ -309,27 +378,36 @@ void ResourceProviderManagerProcess::subscribe(
   ResourceProviderInfo resourceProviderInfo =
     subscribe.resource_provider_info();
 
-  // TODO(chhsiao): Reject the subscription if it contains an unknown ID
-  // or there is already a subscribed instance with the same ID, and add
-  // tests for re-subscriptions.
+  LOG(INFO) << "Subscribing resource provider " << resourceProviderInfo;
+
   if (!resourceProviderInfo.has_id()) {
+    // The resource provider is subscribing for the first time.
     resourceProviderInfo.mutable_id()->CopyFrom(newResourceProviderId());
-  }
 
-  ResourceProvider resourceProvider(resourceProviderInfo, http);
+    ResourceProvider resourceProvider(resourceProviderInfo, http);
 
-  Event event;
-  event.set_type(Event::SUBSCRIBED);
-  event.mutable_subscribed()->mutable_provider_id()->CopyFrom(
-      resourceProvider.info.id());
+    Event event;
+    event.set_type(Event::SUBSCRIBED);
+    event.mutable_subscribed()->mutable_provider_id()->CopyFrom(
+        resourceProvider.info.id());
 
-  if (!resourceProvider.http.send(event)) {
-    LOG(WARNING) << "Unable to send event to resource provider "
-                 << stringify(resourceProvider.info.id())
-                 << ": connection closed";
+    if (!resourceProvider.http.send(event)) {
+      LOG(WARNING) << "Failed to send SUBSCRIBED event to resource provider "
+                   << resourceProvider.info.id() << ": connection closed";
+    }
+
+    // TODO(jieyu): Start heartbeat for the resource provider.
+
+    resourceProviders.subscribed.put(
+        resourceProviderInfo.id(),
+        resourceProvider);
+
+    return;
   }
 
-  resourceProviders.put(resourceProviderInfo.id(), std::move(resourceProvider));
+  // TODO(chhsiao): Reject the subscription if it contains an unknown
+  // ID or there is already a subscribed instance with the same ID,
+  // and add tests for re-subscriptions.
 }
 
 
@@ -402,6 +480,16 @@ Future<http::Response> ResourceProviderManager::api(
 }
 
 
+void ResourceProviderManager::applyOfferOperation(
+    const ApplyOfferOperationMessage& message) const
+{
+  return dispatch(
+      process.get(),
+      &ResourceProviderManagerProcess::applyOfferOperation,
+      message);
+}
+
+
 Queue<ResourceProviderMessage> ResourceProviderManager::messages() const
 {
   return process->messages;

http://git-wip-us.apache.org/repos/asf/mesos/blob/97062ac8/src/resource_provider/manager.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.hpp b/src/resource_provider/manager.hpp
index 3b70e75..e7a9a6c 100644
--- a/src/resource_provider/manager.hpp
+++ b/src/resource_provider/manager.hpp
@@ -23,6 +23,8 @@
 #include <process/owned.hpp>
 #include <process/queue.hpp>
 
+#include "messages/messages.hpp"
+
 #include "resource_provider/message.hpp"
 
 namespace mesos {
@@ -49,6 +51,8 @@ public:
       const process::http::Request& request,
       const Option<process::http::authentication::Principal>& principal) const;
 
+  void applyOfferOperation(const ApplyOfferOperationMessage& message) const;
+
   // Returns a stream of messages from the resource provider manager.
   process::Queue<ResourceProviderMessage> messages() const;
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/97062ac8/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 6cbe209..c108239 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -3626,7 +3626,39 @@ Try<Nothing> Slave::syncCheckpointedResources(
 
 void Slave::applyOfferOperation(const ApplyOfferOperationMessage& message)
 {
-  // TODO(nfnt): Provide implementation here.
+  Try<UUID> uuid = UUID::fromBytes(message.operation_uuid());
+  if (uuid.isError()) {
+    LOG(ERROR) << "Failed to parse offer operation UUID for operation "
+               << "'" << message.operation_info().id() << "' "
+               << "from framework " << message.framework_id()
+               << ": " << uuid.error();
+    return;
+  }
+
+  Result<ResourceProviderID> resourceProviderId =
+    getResourceProviderId(message.operation_info());
+
+  if (resourceProviderId.isError()) {
+    LOG(ERROR) << "Failed to get the resource provider ID of operation "
+               << "'" << message.operation_info().id() << "' "
+               << "(uuid: " << uuid->toString() << ") from framework "
+               << message.framework_id() << ": " << resourceProviderId.error();
+    return;
+  }
+
+  if (resourceProviderId.isSome()) {
+    resourceProviderManager.applyOfferOperation(message);
+    return;
+  }
+
+  // TODO(jieyu): Handle operations for agent default resources. To
+  // support rollback, the agent need to checkpoint the total
+  // resources using the old format (i.e., using `resources.info`).
+  // It's OK that the offer operations are not checkpointed atomically
+  // with the total resources for agent default resources. This is
+  // because the master does not rely on operation feedback to update
+  // the allocation for old operations, and agent default resources
+  // only support old operations.
 }
 
 


[04/14] mesos git commit: Added streaming functions for offer operation ID.

Posted by ji...@apache.org.
Added streaming functions for offer operation ID.

Review: https://reviews.apache.org/r/63413


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/88dbbce8
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/88dbbce8
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/88dbbce8

Branch: refs/heads/master
Commit: 88dbbce8de8cb35e2d35fe44f2df802d69c9b561
Parents: a94e3cd
Author: Jie Yu <yu...@gmail.com>
Authored: Mon Oct 30 13:52:53 2017 +0100
Committer: Jie Yu <yu...@gmail.com>
Committed: Mon Nov 6 14:37:26 2017 -0800

----------------------------------------------------------------------
 include/mesos/type_utils.hpp | 5 +++++
 include/mesos/v1/mesos.hpp   | 5 +++++
 src/common/type_utils.cpp    | 6 ++++++
 src/v1/mesos.cpp             | 6 ++++++
 4 files changed, 22 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/88dbbce8/include/mesos/type_utils.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/type_utils.hpp b/include/mesos/type_utils.hpp
index 290b0aa..2452396 100644
--- a/include/mesos/type_utils.hpp
+++ b/include/mesos/type_utils.hpp
@@ -371,6 +371,11 @@ std::ostream& operator<<(std::ostream& stream, const MasterInfo& master);
 std::ostream& operator<<(std::ostream& stream, const OfferID& offerId);
 
 
+std::ostream& operator<<(
+    std::ostream& stream,
+    const OfferOperationID& offerOperationId);
+
+
 std::ostream& operator<<(std::ostream& stream, const RateLimits& limits);
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/88dbbce8/include/mesos/v1/mesos.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/v1/mesos.hpp b/include/mesos/v1/mesos.hpp
index 1b08d7a..4b53b5c 100644
--- a/include/mesos/v1/mesos.hpp
+++ b/include/mesos/v1/mesos.hpp
@@ -359,6 +359,11 @@ std::ostream& operator<<(std::ostream& stream, const MasterInfo& master);
 std::ostream& operator<<(std::ostream& stream, const OfferID& offerId);
 
 
+std::ostream& operator<<(
+    std::ostream& stream,
+    const OfferOperationID& offerOperationId);
+
+
 std::ostream& operator<<(std::ostream& stream, const RateLimits& limits);
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/88dbbce8/src/common/type_utils.cpp
----------------------------------------------------------------------
diff --git a/src/common/type_utils.cpp b/src/common/type_utils.cpp
index e40d87c..b989b4d 100644
--- a/src/common/type_utils.cpp
+++ b/src/common/type_utils.cpp
@@ -607,6 +607,12 @@ ostream& operator<<(ostream& stream, const OfferID& offerId)
 }
 
 
+ostream& operator<<(ostream& stream, const OfferOperationID& offerOperationId)
+{
+  return stream << offerOperationId.value();
+}
+
+
 ostream& operator<<(ostream& stream, const RateLimits& limits)
 {
   return stream << limits.DebugString();

http://git-wip-us.apache.org/repos/asf/mesos/blob/88dbbce8/src/v1/mesos.cpp
----------------------------------------------------------------------
diff --git a/src/v1/mesos.cpp b/src/v1/mesos.cpp
index 306f7e4..ccb4d2a 100644
--- a/src/v1/mesos.cpp
+++ b/src/v1/mesos.cpp
@@ -509,6 +509,12 @@ ostream& operator<<(ostream& stream, const OfferID& offerId)
 }
 
 
+ostream& operator<<(ostream& stream, const OfferOperationID& offerOperationId)
+{
+  return stream << offerOperationId.value();
+}
+
+
 ostream& operator<<(ostream& stream, const RateLimits& limits)
 {
   return stream << limits.DebugString();


[12/14] mesos git commit: Added some protobuf helper for offer operations.

Posted by ji...@apache.org.
Added some protobuf helper for offer operations.

Review: https://reviews.apache.org/r/63483


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/59646a6c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/59646a6c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/59646a6c

Branch: refs/heads/master
Commit: 59646a6c92ba9030993a3f163a8ba5c3d0c071e1
Parents: 11e36c1
Author: Jie Yu <yu...@gmail.com>
Authored: Wed Nov 1 12:39:51 2017 +0100
Committer: Jie Yu <yu...@gmail.com>
Committed: Mon Nov 6 14:37:26 2017 -0800

----------------------------------------------------------------------
 src/common/protobuf_utils.cpp | 49 ++++++++++++++++++++++++++++++++++++++
 src/common/protobuf_utils.hpp | 12 ++++++++++
 2 files changed, 61 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/59646a6c/src/common/protobuf_utils.cpp
----------------------------------------------------------------------
diff --git a/src/common/protobuf_utils.cpp b/src/common/protobuf_utils.cpp
index 4ce7021..de18cea 100644
--- a/src/common/protobuf_utils.cpp
+++ b/src/common/protobuf_utils.cpp
@@ -36,6 +36,7 @@
 #include <stout/foreach.hpp>
 #include <stout/net.hpp>
 #include <stout/stringify.hpp>
+#include <stout/unreachable.hpp>
 #include <stout/uuid.hpp>
 
 #include <stout/os/permissions.hpp>
@@ -393,13 +394,61 @@ Option<ContainerStatus> getTaskContainerStatus(const Task& task)
 }
 
 
+bool isTerminalState(const OfferOperationState& state)
+{
+  switch (state) {
+    case OFFER_OPERATION_FINISHED:
+    case OFFER_OPERATION_FAILED:
+    case OFFER_OPERATION_ERROR:
+      return true;
+    case OFFER_OPERATION_PENDING:
+    case OFFER_OPERATION_UNSUPPORTED:
+      return false;
+  }
+
+  UNREACHABLE();
+}
+
+
+OfferOperationStatus createOfferOperationStatus(
+    const OfferOperationState& state,
+    const Option<OfferOperationID>& operationId,
+    const Option<string>& message,
+    const Option<Resources>& convertedResources,
+    const Option<UUID>& statusUUID)
+{
+  OfferOperationStatus status;
+  status.set_state(state);
+
+  if (operationId.isSome()) {
+    status.mutable_operation_id()->CopyFrom(operationId.get());
+  }
+
+  if (message.isSome()) {
+    status.set_message(message.get());
+  }
+
+  if (convertedResources.isSome()) {
+    status.mutable_converted_resources()->CopyFrom(convertedResources.get());
+  }
+
+  if (statusUUID.isSome()) {
+    status.set_status_uuid(statusUUID->toBytes());
+  }
+
+  return status;
+}
+
+
 OfferOperation createOfferOperation(
     const Offer::Operation& info,
+    const OfferOperationStatus& latestStatus,
     const FrameworkID& frameworkId)
 {
   OfferOperation operation;
   operation.mutable_framework_id()->CopyFrom(frameworkId);
   operation.mutable_info()->CopyFrom(info);
+  operation.mutable_latest_status()->CopyFrom(latestStatus);
   operation.set_operation_uuid(UUID::random().toBytes());
 
   return operation;

http://git-wip-us.apache.org/repos/asf/mesos/blob/59646a6c/src/common/protobuf_utils.hpp
----------------------------------------------------------------------
diff --git a/src/common/protobuf_utils.hpp b/src/common/protobuf_utils.hpp
index da0a84e..5e476be 100644
--- a/src/common/protobuf_utils.hpp
+++ b/src/common/protobuf_utils.hpp
@@ -149,8 +149,20 @@ Option<CheckStatusInfo> getTaskCheckStatus(const Task& task);
 Option<ContainerStatus> getTaskContainerStatus(const Task& task);
 
 
+bool isTerminalState(const OfferOperationState& state);
+
+
+OfferOperationStatus createOfferOperationStatus(
+    const OfferOperationState& state,
+    const Option<OfferOperationID>& operationId = None(),
+    const Option<std::string>& message = None(),
+    const Option<Resources>& convertedResources = None(),
+    const Option<UUID>& statusUUID = None());
+
+
 OfferOperation createOfferOperation(
     const Offer::Operation& info,
+    const OfferOperationStatus& latestStatus,
     const FrameworkID& frameworkId);
 
 


[11/14] mesos git commit: Updated `Allocator::updateAllocation` to take `ResourceConversion`s.

Posted by ji...@apache.org.
Updated `Allocator::updateAllocation` to take `ResourceConversion`s.

Instead of taking offer operations, it's more clear to make allocator
not aware of offer operations. This patch changes the interface of the
`updateAllocation` call to take `ResourceConversion`s.

Review: https://reviews.apache.org/r/63523


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d73a6162
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d73a6162
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d73a6162

Branch: refs/heads/master
Commit: d73a6162c467abadc5f9bb5c05034ba374aa0b68
Parents: e55309a
Author: Jie Yu <yu...@gmail.com>
Authored: Thu Nov 2 20:33:27 2017 +0100
Committer: Jie Yu <yu...@gmail.com>
Committed: Mon Nov 6 14:37:26 2017 -0800

----------------------------------------------------------------------
 include/mesos/allocator/allocator.hpp       |   2 +-
 src/master/allocator/mesos/allocator.hpp    |   8 +-
 src/master/allocator/mesos/hierarchical.cpp | 161 ++++++-----------------
 src/master/allocator/mesos/hierarchical.hpp |   2 +-
 src/master/master.cpp                       | 117 ++++++++++++----
 src/tests/allocator.hpp                     |   2 +-
 src/tests/hierarchical_allocator_tests.cpp  |  39 ++++--
 7 files changed, 162 insertions(+), 169 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/d73a6162/include/mesos/allocator/allocator.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/allocator/allocator.hpp b/include/mesos/allocator/allocator.hpp
index 537658b..ae12200 100644
--- a/include/mesos/allocator/allocator.hpp
+++ b/include/mesos/allocator/allocator.hpp
@@ -268,7 +268,7 @@ public:
       const FrameworkID& frameworkId,
       const SlaveID& slaveId,
       const Resources& offeredResources,
-      const std::vector<Offer::Operation>& operations) = 0;
+      const std::vector<ResourceConversion>& conversions) = 0;
 
   /**
    * Updates available resources on an agent based on a sequence of offer

http://git-wip-us.apache.org/repos/asf/mesos/blob/d73a6162/src/master/allocator/mesos/allocator.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/allocator.hpp b/src/master/allocator/mesos/allocator.hpp
index 903edf6..8fa4fde 100644
--- a/src/master/allocator/mesos/allocator.hpp
+++ b/src/master/allocator/mesos/allocator.hpp
@@ -119,7 +119,7 @@ public:
       const FrameworkID& frameworkId,
       const SlaveID& slaveId,
       const Resources& offeredResources,
-      const std::vector<Offer::Operation>& operations);
+      const std::vector<ResourceConversion>& conversions);
 
   process::Future<Nothing> updateAvailable(
       const SlaveID& slaveId,
@@ -260,7 +260,7 @@ public:
       const FrameworkID& frameworkId,
       const SlaveID& slaveId,
       const Resources& offeredResources,
-      const std::vector<Offer::Operation>& operations) = 0;
+      const std::vector<ResourceConversion>& conversions) = 0;
 
   virtual process::Future<Nothing> updateAvailable(
       const SlaveID& slaveId,
@@ -540,7 +540,7 @@ inline void MesosAllocator<AllocatorProcess>::updateAllocation(
     const FrameworkID& frameworkId,
     const SlaveID& slaveId,
     const Resources& offeredResources,
-    const std::vector<Offer::Operation>& operations)
+    const std::vector<ResourceConversion>& conversions)
 {
   process::dispatch(
       process,
@@ -548,7 +548,7 @@ inline void MesosAllocator<AllocatorProcess>::updateAllocation(
       frameworkId,
       slaveId,
       offeredResources,
-      operations);
+      conversions);
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/d73a6162/src/master/allocator/mesos/hierarchical.cpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.cpp b/src/master/allocator/mesos/hierarchical.cpp
index 848e9da..f0f1111 100644
--- a/src/master/allocator/mesos/hierarchical.cpp
+++ b/src/master/allocator/mesos/hierarchical.cpp
@@ -724,7 +724,7 @@ void HierarchicalAllocatorProcess::updateAllocation(
     const FrameworkID& frameworkId,
     const SlaveID& slaveId,
     const Resources& offeredResources,
-    const vector<Offer::Operation>& operations)
+    const vector<ResourceConversion>& conversions)
 {
   CHECK(initialized);
   CHECK(slaves.contains(slaveId));
@@ -750,101 +750,22 @@ void HierarchicalAllocatorProcess::updateAllocation(
     frameworkSorter->allocation(frameworkId.value(), slaveId);
 
   // We keep a copy of the offered resources here and it is updated
-  // by the operations.
-  Resources updatedOfferedResources = offeredResources;
-
-  // Accumulate consumed resources for all tasks in all `LAUNCH` operations.
+  // by the specified resource conversions.
   //
-  // For LAUNCH operations we support tasks requesting more instances of
-  // shared resources than those being offered. We keep track of total
-  // consumed resources to determine the additional instances and allocate
-  // them as part of updating the framework's allocation (i.e., add
-  // them to the allocated resources in the allocator and in each
-  // of the sorters).
-  Resources consumed;
-
-  // Used for logging.
-  hashset<TaskID> taskIds;
-
-  foreach (const Offer::Operation& operation, operations) {
-    // The operations should have been normalized by the master via
-    // `protobuf::injectAllocationInfo()`.
-    //
-    // TODO(bmahler): Check that the operations have the allocation
-    // info set. The master should enforce this. E.g.
-    //
-    //  foreach (const Offer::Operation& operation, operations) {
-    //    CHECK_NONE(validateOperationOnAllocatedResources(operation));
-    //  }
-
-    // Update the offered resources based on this operation.
-    switch (operation.type()) {
-      case Offer::Operation::LAUNCH:
-      case Offer::Operation::LAUNCH_GROUP:
-        // No need to apply LAUNCH and LAUNCH_GROUP.
-        break;
-      case Offer::Operation::RESERVE:
-      case Offer::Operation::UNRESERVE:
-      case Offer::Operation::CREATE:
-      case Offer::Operation::DESTROY: {
-        Try<Resources> _updatedOfferedResources =
-          updatedOfferedResources.apply(operation);
-
-        CHECK_SOME(_updatedOfferedResources);
-        updatedOfferedResources = _updatedOfferedResources.get();
-        break;
-      }
-      case Offer::Operation::CREATE_VOLUME:
-      case Offer::Operation::DESTROY_VOLUME:
-      case Offer::Operation::CREATE_BLOCK:
-      case Offer::Operation::DESTROY_BLOCK:
-        // TODO(jieyu): Add implementations here.
-        break;
-      case Offer::Operation::UNKNOWN:
-        UNREACHABLE();
-        break;
-    }
-
-    if (operation.type() == Offer::Operation::LAUNCH) {
-      foreach (const TaskInfo& task, operation.launch().task_infos()) {
-        taskIds.insert(task.task_id());
-
-        // For now we only need to look at the task resources and
-        // ignore the executor resources.
-        //
-        // TODO(anindya_sinha): For simplicity we currently don't
-        // allow shared resources in ExecutorInfo. The reason is that
-        // the allocator has no idea if the executor within the task
-        // represents a new executor. Therefore we cannot reliably
-        // determine if the executor resources are needed for this task.
-        // The TODO is to support it. We need to pass in the information
-        // pertaining to the executor before enabling shared resources
-        // in the executor.
-        consumed += task.resources();
-      }
-    }
-  }
-
-  // Check that offered resources contain at least one copy of each
-  // consumed shared resource (guaranteed by master validation).
-  Resources consumedShared = consumed.shared();
-  Resources updatedOfferedShared = updatedOfferedResources.shared();
-
-  foreach (const Resource& resource, consumedShared) {
-    CHECK(updatedOfferedShared.contains(resource));
-  }
-
-  // Determine the additional instances of shared resources needed to be
-  // added to the allocations.
-  Resources additional = consumedShared - updatedOfferedShared;
-
-  if (!additional.empty()) {
-    LOG(INFO) << "Allocating additional resources " << additional
-              << " for tasks " << stringify(taskIds)
-              << " of framework " << frameworkId << " on agent " << slaveId;
+  // The resources in the resource conversions should have been
+  // normalized by the master (contains proper AllocationInfo).
+  //
+  // TODO(bmahler): Check that the resources in the resource
+  // conversions have AllocationInfo set. The master should enforce
+  // this. E.g.
+  //
+  //  foreach (const ResourceConversion& conversion, conversions) {
+  //    CHECK_NONE(validateConversionOnAllocatedResources(conversion));
+  //  }
+  Try<Resources> _updatedOfferedResources = offeredResources.apply(conversions);
+  CHECK_SOME(_updatedOfferedResources);
 
-    updatedOfferedResources += additional;
-  }
+  const Resources& updatedOfferedResources = _updatedOfferedResources.get();
 
   // Update the per-slave allocation.
   slave.allocated -= offeredResources;
@@ -880,36 +801,34 @@ void HierarchicalAllocatorProcess::updateAllocation(
   // the agent's total resources shouldn't contain:
   // 1. The additionally allocated shared resources.
   // 2. `AllocationInfo` as set in `updatedOfferedResources`.
-
-  // We strip `AllocationInfo` from operations in order to apply them
+  //
+  // We strip `AllocationInfo` from conversions in order to apply them
   // successfully, since agent's total is stored as unallocated resources.
-  vector<Offer::Operation> strippedOperations;
-  foreach (Offer::Operation operation, operations) {
-    switch (operation.type()) {
-      case Offer::Operation::LAUNCH:
-      case Offer::Operation::LAUNCH_GROUP:
-        // No need to apply LAUNCH and LAUNCH_GROUP.
-        break;
-      case Offer::Operation::RESERVE:
-      case Offer::Operation::UNRESERVE:
-      case Offer::Operation::CREATE:
-      case Offer::Operation::DESTROY:
-        protobuf::stripAllocationInfo(&operation);
-        strippedOperations.push_back(operation);
-        break;
-      case Offer::Operation::CREATE_VOLUME:
-      case Offer::Operation::DESTROY_VOLUME:
-      case Offer::Operation::CREATE_BLOCK:
-      case Offer::Operation::DESTROY_BLOCK:
-        // TODO(jieyu): Add implementations here.
-        break;
-      case Offer::Operation::UNKNOWN:
-        UNREACHABLE();
-        break;
+  vector<ResourceConversion> strippedConversions;
+  foreach (const ResourceConversion& conversion, conversions) {
+    // TODO(jieyu): Ideally, we should make sure agent's total
+    // resources are consistent with agent's allocation in terms of
+    // shared resources. In other words, we should increase agent's
+    // total resources as well for those additional allocation we did
+    // for shared resources. However, that means we need to update the
+    // agent's total resources when performing allocation for shared
+    // resources (in `__allocate()`). For now, we detect "additional"
+    // allocation for shared resources by checking if a conversion has
+    // an empty `consumed` field.
+    if (conversion.consumed.empty()) {
+      continue;
     }
+
+    Resources consumed = conversion.consumed;
+    Resources converted = conversion.converted;
+
+    consumed.unallocate();
+    converted.unallocate();
+
+    strippedConversions.emplace_back(consumed, converted);
   }
 
-  Try<Resources> updatedTotal = slave.total.apply(strippedOperations);
+  Try<Resources> updatedTotal = slave.total.apply(strippedConversions);
   CHECK_SOME(updatedTotal);
 
   updateSlaveTotal(slaveId, updatedTotal.get());
@@ -919,7 +838,7 @@ void HierarchicalAllocatorProcess::updateAllocation(
   frameworkSorter->add(slaveId, updatedOfferedResources);
 
   // Check that the unreserved quantities for framework allocations
-  // have not changed by the above operations.
+  // have not changed by the above resource conversions.
   const Resources updatedFrameworkAllocation =
     frameworkSorter->allocation(frameworkId.value(), slaveId);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/d73a6162/src/master/allocator/mesos/hierarchical.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp
index c234605..2f7755a 100644
--- a/src/master/allocator/mesos/hierarchical.hpp
+++ b/src/master/allocator/mesos/hierarchical.hpp
@@ -164,7 +164,7 @@ public:
       const FrameworkID& frameworkId,
       const SlaveID& slaveId,
       const Resources& offeredResources,
-      const std::vector<Offer::Operation>& operations);
+      const std::vector<ResourceConversion>& conversions);
 
   process::Future<Nothing> updateAvailable(
       const SlaveID& slaveId,

http://git-wip-us.apache.org/repos/asf/mesos/blob/d73a6162/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 831aaac..2d340c4 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -4475,16 +4475,18 @@ void Master::_accept(
   // the LAUNCH case below.
   Resources offeredSharedResources = offeredResources.shared();
 
-  // Maintain a list of operations to pass to the allocator.
-  // Note that this list could be different than `accept.operations()`
-  // because:
+  // Maintain a list of resource conversions to pass to the allocator
+  // as a result of offer operations. Note that:
   // 1) We drop invalid operations.
-  // 2) For LAUNCH operations we change the operation to drop invalid tasks.
-  // 3) We don't pass LAUNCH_GROUP to the allocator as we don't currently
-  //    support use cases that require the allocator to be aware of it.
+  // 2) For LAUNCH operations, we drop invalid tasks. LAUNCH operation
+  //    will result in resource conversions because of shared
+  //    resources.
+  // 3) Currently, LAUNCH_GROUP won't result in resource conversions
+  //    because shared resources are not supported yet if the
+  //    framework uses LAUNCH_GROUP operation.
   //
-  // The operation order should remain unchanged.
-  vector<Offer::Operation> operations;
+  // The order of the conversions is important and preserved.
+  vector<ResourceConversion> conversions;
 
   // The order of `authorizations` must match the order of the operations in
   // `accept.operations()`, as they are iterated through simultaneously.
@@ -4538,7 +4540,15 @@ void Master::_accept(
         }
 
         // Test the given operation on the included resources.
-        Try<Resources> resources = _offeredResources.apply(operation);
+        Try<vector<ResourceConversion>> _conversions =
+          getResourceConversions(operation);
+
+        if (_conversions.isError()) {
+          drop(framework, operation, _conversions.error());
+          continue;
+        }
+
+        Try<Resources> resources = _offeredResources.apply(_conversions.get());
         if (resources.isError()) {
           drop(framework, operation, resources.error());
           continue;
@@ -4552,7 +4562,10 @@ void Master::_accept(
 
         _apply(slave, operation);
 
-        operations.push_back(operation);
+        conversions.insert(
+            conversions.end(),
+            _conversions->begin(),
+            _conversions->end());
 
         break;
       }
@@ -4593,7 +4606,15 @@ void Master::_accept(
         }
 
         // Test the given operation on the included resources.
-        Try<Resources> resources = _offeredResources.apply(operation);
+        Try<vector<ResourceConversion>> _conversions =
+          getResourceConversions(operation);
+
+        if (_conversions.isError()) {
+          drop(framework, operation, _conversions.error());
+          continue;
+        }
+
+        Try<Resources> resources = _offeredResources.apply(_conversions.get());
         if (resources.isError()) {
           drop(framework, operation, resources.error());
           continue;
@@ -4607,7 +4628,10 @@ void Master::_accept(
 
         _apply(slave, operation);
 
-        operations.push_back(operation);
+        conversions.insert(
+            conversions.end(),
+            _conversions->begin(),
+            _conversions->end());
 
         break;
       }
@@ -4657,7 +4681,16 @@ void Master::_accept(
           continue;
         }
 
-        Try<Resources> resources = _offeredResources.apply(operation);
+        // Test the given operation on the included resources.
+        Try<vector<ResourceConversion>> _conversions =
+          getResourceConversions(operation);
+
+        if (_conversions.isError()) {
+          drop(framework, operation, _conversions.error());
+          continue;
+        }
+
+        Try<Resources> resources = _offeredResources.apply(_conversions.get());
         if (resources.isError()) {
           drop(framework, operation, resources.error());
           continue;
@@ -4672,7 +4705,10 @@ void Master::_accept(
 
         _apply(slave, operation);
 
-        operations.push_back(operation);
+        conversions.insert(
+            conversions.end(),
+            _conversions->begin(),
+            _conversions->end());
 
         break;
       }
@@ -4738,7 +4774,16 @@ void Master::_accept(
           }
         }
 
-        Try<Resources> resources = _offeredResources.apply(operation);
+        // Test the given operation on the included resources.
+        Try<vector<ResourceConversion>> _conversions =
+          getResourceConversions(operation);
+
+        if (_conversions.isError()) {
+          drop(framework, operation, _conversions.error());
+          continue;
+        }
+
+        Try<Resources> resources = _offeredResources.apply(_conversions.get());
         if (resources.isError()) {
           drop(framework, operation, resources.error());
           continue;
@@ -4753,18 +4798,15 @@ void Master::_accept(
 
         _apply(slave, operation);
 
-        operations.push_back(operation);
+        conversions.insert(
+            conversions.end(),
+            _conversions->begin(),
+            _conversions->end());
 
         break;
       }
 
       case Offer::Operation::LAUNCH: {
-        // For the LAUNCH operation we drop invalid tasks. Therefore
-        // we create a new copy with only the valid tasks to pass to
-        // the allocator.
-        Offer::Operation _operation;
-        _operation.set_type(Offer::Operation::LAUNCH);
-
         foreach (const TaskInfo& task, operation.launch().task_infos()) {
           Future<bool> authorization = authorizations.front();
           authorizations.pop_front();
@@ -4868,6 +4910,29 @@ void Master::_accept(
             CHECK(available.contains(consumed))
               << available << " does not contain " << consumed;
 
+            // Determine the additional instances of shared resources
+            // needed to be added to the allocations since we support
+            // tasks requesting more instances of shared resources
+            // than those being offered.
+            const Resources& consumedShared = consumed.shared();
+
+            // Check that offered resources contain at least one copy
+            // of each consumed shared resource (guaranteed by master
+            // validation).
+            foreach (const Resource& resource, consumedShared) {
+              CHECK(offeredSharedResources.contains(resource));
+            }
+
+            Resources additional = consumedShared - _offeredResources.shared();
+            if (!additional.empty()) {
+              LOG(INFO) << "Allocating additional resources " << additional
+                        << " for task " << task.task_id()
+                        << " of framework " << *framework
+                        << " on agent " << *slave;
+
+              conversions.emplace_back(Resources(), additional);
+            }
+
             _offeredResources -= consumed;
 
             RunTaskMessage message;
@@ -4912,12 +4977,8 @@ void Master::_accept(
 
             send(slave->pid, message);
           }
-
-          _operation.mutable_launch()->add_task_infos()->CopyFrom(task);
         }
 
-        operations.push_back(_operation);
-
         break;
       }
 
@@ -5299,12 +5360,12 @@ void Master::_accept(
   }
 
   // Update the allocator based on the offer operations.
-  if (!operations.empty()) {
+  if (!conversions.empty()) {
     allocator->updateAllocation(
         frameworkId,
         slaveId,
         offeredResources,
-        operations);
+        conversions);
   }
 
   if (!_offeredResources.empty()) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/d73a6162/src/tests/allocator.hpp
----------------------------------------------------------------------
diff --git a/src/tests/allocator.hpp b/src/tests/allocator.hpp
index f1c0d14..6a84f1b 100644
--- a/src/tests/allocator.hpp
+++ b/src/tests/allocator.hpp
@@ -427,7 +427,7 @@ public:
       const FrameworkID&,
       const SlaveID&,
       const Resources&,
-      const std::vector<Offer::Operation>&));
+      const std::vector<ResourceConversion>&));
 
   MOCK_METHOD2(updateAvailable, process::Future<Nothing>(
       const SlaveID&,

http://git-wip-us.apache.org/repos/asf/mesos/blob/d73a6162/src/tests/hierarchical_allocator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/hierarchical_allocator_tests.cpp b/src/tests/hierarchical_allocator_tests.cpp
index 48b48ad..f0f95ba 100644
--- a/src/tests/hierarchical_allocator_tests.cpp
+++ b/src/tests/hierarchical_allocator_tests.cpp
@@ -1527,9 +1527,12 @@ TEST_F(HierarchicalAllocatorTest, UpdateAllocation)
   create.set_type(Offer::Operation::CREATE);
   create.mutable_create()->add_volumes()->CopyFrom(volume);
 
+  Try<vector<ResourceConversion>> conversions = getResourceConversions(create);
+  ASSERT_SOME(conversions);
+
   // Ensure the offer operation can be applied.
   Try<Resources> updated =
-    allocation->resources.at("role1").at(slave.id()).apply(create);
+    allocation->resources.at("role1").at(slave.id()).apply(conversions.get());
 
   ASSERT_SOME(updated);
 
@@ -1538,7 +1541,7 @@ TEST_F(HierarchicalAllocatorTest, UpdateAllocation)
       framework.id(),
       slave.id(),
       allocation->resources.at("role1").at(slave.id()),
-      {create});
+      conversions.get());
 
   // Now recover the resources, and expect the next allocation to
   // contain the updated resources.
@@ -1600,9 +1603,12 @@ TEST_F(HierarchicalAllocatorTest, UpdateAllocationSharedPersistentVolume)
 
   Offer::Operation create = CREATE(volume);
 
+  Try<vector<ResourceConversion>> conversions = getResourceConversions(create);
+  ASSERT_SOME(conversions);
+
   // Ensure the offer operation can be applied.
   Try<Resources> update =
-    allocation->resources.at("role1").at(slave.id()).apply(create);
+    allocation->resources.at("role1").at(slave.id()).apply(conversions.get());
 
   ASSERT_SOME(update);
 
@@ -1611,7 +1617,7 @@ TEST_F(HierarchicalAllocatorTest, UpdateAllocationSharedPersistentVolume)
       framework.id(),
       slave.id(),
       allocation->resources.at("role1").at(slave.id()),
-      {create});
+      conversions.get());
 
   // Now recover the resources, and expect the next allocation to
   // contain the updated resources.
@@ -1636,16 +1642,19 @@ TEST_F(HierarchicalAllocatorTest, UpdateAllocationSharedPersistentVolume)
   // destroy the shared volume.
   Offer::Operation destroy = DESTROY(volume);
 
+  conversions = getResourceConversions(destroy);
+  ASSERT_SOME(conversions);
+
   // Update the allocation in the allocator.
   allocator->updateAllocation(
       framework.id(),
       slave.id(),
       allocation->resources.at("role1").at(slave.id()),
-      {destroy});
+      conversions.get());
 
   // The resources to recover should be equal to the agent's original
   // resources now that the shared volume is created and then destroyed.
-  update = update->apply(destroy);
+  update = update->apply(conversions.get());
   ASSERT_SOME_EQ(allocatedResources(slave.resources(), "role1"), update);
 
   allocator->recoverResources(
@@ -1701,9 +1710,12 @@ TEST_F(HierarchicalAllocatorTest, SharedResourcesCapability)
 
   Offer::Operation create = CREATE(volume);
 
+  Try<vector<ResourceConversion>> conversions = getResourceConversions(create);
+  ASSERT_SOME(conversions);
+
   // Ensure the offer operation can be applied.
   Try<Resources> update =
-    allocation->resources.at("role1").at(slave.id()).apply(create);
+    allocation->resources.at("role1").at(slave.id()).apply(conversions.get());
 
   ASSERT_SOME(update);
 
@@ -1712,7 +1724,7 @@ TEST_F(HierarchicalAllocatorTest, SharedResourcesCapability)
       framework1.id(),
       slave.id(),
       allocation->resources.at("role1").at(slave.id()),
-      {create});
+      conversions.get());
 
   // Now recover the resources, and expect the next allocation to
   // contain the updated resources.
@@ -4989,6 +5001,7 @@ TEST_P(HierarchicalAllocatorTestWithParam, AllocateSharedResources)
   // Create a shared volume.
   Resource volume = createDiskResource(
       "5", "role1", "id1", None(), None(), true);
+
   Offer::Operation create = CREATE(volume);
 
   protobuf::injectAllocationInfo(&create, allocationInfo);
@@ -4998,13 +5011,13 @@ TEST_P(HierarchicalAllocatorTestWithParam, AllocateSharedResources)
       slave.id(),
       Resources::parse("cpus:1;mem:5").get() + volume,
       "echo abc > path1/file");
-  Offer::Operation launch = LAUNCH({task});
 
-  protobuf::injectAllocationInfo(&launch, allocationInfo);
+  Try<vector<ResourceConversion>> conversions = getResourceConversions(create);
+  ASSERT_SOME(conversions);
 
   // Ensure the CREATE operation can be applied.
   Try<Resources> updated =
-    allocation->resources.at("role1").at(slave.id()).apply(create);
+    allocation->resources.at("role1").at(slave.id()).apply(conversions.get());
 
   ASSERT_SOME(updated);
 
@@ -5014,7 +5027,7 @@ TEST_P(HierarchicalAllocatorTestWithParam, AllocateSharedResources)
       framework1.id(),
       slave.id(),
       allocation->resources.at("role1").at(slave.id()),
-      {create, launch});
+      conversions.get());
 
   // Now recover the resources, and expect the next allocation to contain
   // the updated resources. Note that the volume is not recovered as it is
@@ -5032,7 +5045,7 @@ TEST_P(HierarchicalAllocatorTestWithParam, AllocateSharedResources)
       framework2.id(),
       {{"role1", {{slave.id(),
           updated.get() -
-          launch.launch().task_infos(0).resources() +
+          allocatedResources(task.resources(), "role1") +
           create.create().volumes()}}}});
 
   AWAIT_EXPECT_EQ(expected, allocations.get());


[10/14] mesos git commit: Added initial code for offer operation status update in master.

Posted by ji...@apache.org.
Added initial code for offer operation status update in master.

Review: https://reviews.apache.org/r/63485


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2f1efb99
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2f1efb99
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2f1efb99

Branch: refs/heads/master
Commit: 2f1efb99acccf3b76eb1bc30704aa794545076ff
Parents: 8f727c9
Author: Jie Yu <yu...@gmail.com>
Authored: Wed Nov 1 23:50:21 2017 +0100
Committer: Jie Yu <yu...@gmail.com>
Committed: Mon Nov 6 14:37:26 2017 -0800

----------------------------------------------------------------------
 src/master/master.cpp | 138 +++++++++++++++++++++++++++++++++++++++------
 src/master/master.hpp |  25 +++++++-
 2 files changed, 142 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/2f1efb99/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 9ac1861..831aaac 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -5145,16 +5145,19 @@ void Master::_accept(
                   << operation.create_volume().source() << " from framework "
                   << *framework << " to agent " << *slave;
 
-        Owned<OfferOperation> offerOperation(new OfferOperation(
-            protobuf::createOfferOperation(operation, frameworkId)));
+        OfferOperation* offerOperation = new OfferOperation(
+            protobuf::createOfferOperation(
+                operation,
+                protobuf::createOfferOperationStatus(OFFER_OPERATION_PENDING),
+                frameworkId));
+
+        addOfferOperation(framework, slave, offerOperation);
 
         ApplyOfferOperationMessage message;
         message.mutable_framework_id()->CopyFrom(frameworkId);
         message.mutable_operation_info()->CopyFrom(offerOperation->info());
         message.set_operation_uuid(offerOperation->operation_uuid());
 
-        framework->addOfferOperation(std::move(offerOperation));
-
         send(slave->pid, message);
         break;
       }
@@ -5185,16 +5188,19 @@ void Master::_accept(
                   << operation.destroy_volume().volume() << " from framework "
                   << *framework << " to agent " << *slave;
 
-        Owned<OfferOperation> offerOperation(new OfferOperation(
-            protobuf::createOfferOperation(operation, frameworkId)));
+        OfferOperation* offerOperation = new OfferOperation(
+            protobuf::createOfferOperation(
+                operation,
+                protobuf::createOfferOperationStatus(OFFER_OPERATION_PENDING),
+                frameworkId));
+
+        addOfferOperation(framework, slave, offerOperation);
 
         ApplyOfferOperationMessage message;
         message.mutable_framework_id()->CopyFrom(frameworkId);
         message.mutable_operation_info()->CopyFrom(offerOperation->info());
         message.set_operation_uuid(offerOperation->operation_uuid());
 
-        framework->addOfferOperation(std::move(offerOperation));
-
         send(slave->pid, message);
         break;
       }
@@ -5225,16 +5231,19 @@ void Master::_accept(
                   << operation.create_block().source() << " from framework "
                   << *framework << " to agent " << *slave;
 
-        Owned<OfferOperation> offerOperation(new OfferOperation(
-            protobuf::createOfferOperation(operation, frameworkId)));
+        OfferOperation* offerOperation = new OfferOperation(
+            protobuf::createOfferOperation(
+                operation,
+                protobuf::createOfferOperationStatus(OFFER_OPERATION_PENDING),
+                frameworkId));
+
+        addOfferOperation(framework, slave, offerOperation);
 
         ApplyOfferOperationMessage message;
         message.mutable_framework_id()->CopyFrom(frameworkId);
         message.mutable_operation_info()->CopyFrom(offerOperation->info());
         message.set_operation_uuid(offerOperation->operation_uuid());
 
-        framework->addOfferOperation(std::move(offerOperation));
-
         send(slave->pid, message);
         break;
       }
@@ -5265,16 +5274,19 @@ void Master::_accept(
                   << operation.destroy_block().block() << " from framework "
                   << *framework << " to agent " << *slave;
 
-        Owned<OfferOperation> offerOperation(new OfferOperation(
-            protobuf::createOfferOperation(operation, frameworkId)));
+        OfferOperation* offerOperation = new OfferOperation(
+            protobuf::createOfferOperation(
+                operation,
+                protobuf::createOfferOperationStatus(OFFER_OPERATION_PENDING),
+                frameworkId));
+
+        addOfferOperation(framework, slave, offerOperation);
 
         ApplyOfferOperationMessage message;
         message.mutable_framework_id()->CopyFrom(frameworkId);
         message.mutable_operation_info()->CopyFrom(offerOperation->info());
         message.set_operation_uuid(offerOperation->operation_uuid());
 
-        framework->addOfferOperation(std::move(offerOperation));
-
         send(slave->pid, message);
         break;
       }
@@ -7263,9 +7275,56 @@ void Master::forward(
 
 
 void Master::offerOperationStatusUpdate(
-    const OfferOperationStatusUpdate& message)
+    const OfferOperationStatusUpdate& update)
 {
-  // TODO(jieyu): Provide implementation here.
+  CHECK(update.has_slave_id())
+    << "External resource provider is not supported yet";
+
+  const SlaveID& slaveId = update.slave_id();
+  const FrameworkID& frameworkId = update.framework_id();
+
+  Try<UUID> uuid = UUID::fromString(update.operation_uuid());
+  if (uuid.isError()) {
+    LOG(ERROR) << "Failed to parse offer operation UUID for operation "
+               << "'" << update.status().operation_id() << "' "
+               << "from framework " << frameworkId << ": " << uuid.error();
+    return;
+  }
+
+  Slave* slave = slaves.registered.get(slaveId);
+
+  // This is possible if the agent is marked as unreachable or gone,
+  // or has initiated a graceful shutdown. In either of those cases,
+  // ignore the offer operation status update.
+  //
+  // TODO(jieyu): If the agent is unreachable or has initiated a
+  // graceful shutdown, we can still forward the update to the
+  // framework so that the framework can get notified about the offer
+  // operation early. However, the acknowledgement of the update won't
+  // be able to reach the agent in those cases. If the agent is gone,
+  // we cannot forward the update because the master might already
+  // tell the framework that the operation is gone.
+  if (slave == nullptr) {
+    LOG(WARNING) << "Ignoring status update for offer operation '"
+                 << update.status().operation_id() << "' (uuid: "
+                 << uuid->toString() << ") for framework "
+                 << frameworkId << " because agent "
+                 << slaveId << " is not registered";
+    return;
+  }
+
+  OfferOperation* operation = slave->getOfferOperation(uuid.get());
+  if (operation == nullptr) {
+    LOG(ERROR) << "Failed to find the offer operation '"
+               << update.status().operation_id() << "' (uuid: "
+               << uuid->toString() << ") from framework "
+               << frameworkId << " on agent " << slaveId;
+    return;
+  }
+
+  updateOfferOperation(operation, update);
+
+  // TODO(jieyu): Forward the status update to the framework.
 }
 
 
@@ -8426,6 +8485,10 @@ void Master::recoverFramework(
         framework->addExecutor(slave->id, executor);
       }
     }
+
+    foreachvalue (OfferOperation* operation, slave->offerOperations) {
+      framework->addOfferOperation(operation);
+    }
   }
 
   addFramework(framework, suppressedRoles);
@@ -9486,6 +9549,27 @@ void Master::removeExecutor(
 }
 
 
+void Master::addOfferOperation(
+    Framework* framework,
+    Slave* slave,
+    OfferOperation* operation)
+{
+  CHECK_NOTNULL(framework);
+  CHECK_NOTNULL(slave);
+  CHECK_NOTNULL(operation);
+
+  slave->addOfferOperation(operation);
+  framework->addOfferOperation(operation);
+}
+
+
+void Master::updateOfferOperation(
+    OfferOperation* operation,
+    OfferOperationStatusUpdate update)
+{
+}
+
+
 Future<Nothing> Master::apply(Slave* slave, const Offer::Operation& operation)
 {
   CHECK_NOTNULL(slave);
@@ -10371,6 +10455,24 @@ void Slave::removeTask(Task* task)
 }
 
 
+void Slave::addOfferOperation(OfferOperation* operation)
+{
+  Try<UUID> uuid = UUID::fromBytes(operation->operation_uuid());
+  CHECK_SOME(uuid);
+
+  offerOperations.put(uuid.get(), operation);
+}
+
+
+OfferOperation* Slave::getOfferOperation(const UUID& uuid) const
+{
+  if (offerOperations.contains(uuid)) {
+    return offerOperations.at(uuid);
+  }
+  return nullptr;
+}
+
+
 void Slave::addOffer(Offer* offer)
 {
   CHECK(!offers.contains(offer)) << "Duplicate offer " << offer->id();

http://git-wip-us.apache.org/repos/asf/mesos/blob/2f1efb99/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 52f5576..4c18258 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -146,6 +146,9 @@ struct Slave
 
   void removeTask(Task* task);
 
+  void addOfferOperation(OfferOperation* operation);
+  OfferOperation* getOfferOperation(const UUID& uuid) const;
+
   void addOffer(Offer* offer);
 
   void removeOffer(Offer* offer);
@@ -228,6 +231,10 @@ struct Slave
   // This is used for reconciliation when the slave re-registers.
   multihashmap<FrameworkID, TaskID> killedTasks;
 
+  // Pending operations or terminal operations that have
+  // unacknowledged status updates on this agent.
+  hashmap<UUID, OfferOperation*> offerOperations;
+
   // Active offers on this slave.
   hashset<Offer*> offers;
 
@@ -475,7 +482,7 @@ public:
       const std::vector<TaskStatus>& statuses);
 
   void offerOperationStatusUpdate(
-      const OfferOperationStatusUpdate& message);
+      const OfferOperationStatusUpdate& update);
 
   void exitedExecutor(
       const process::UPID& from,
@@ -864,6 +871,18 @@ protected:
       const FrameworkID& frameworkId,
       const ExecutorID& executorId);
 
+  // Adds the given offer operation to the framework and the agent.
+  void addOfferOperation(
+      Framework* framework,
+      Slave* slave,
+      OfferOperation* operation);
+
+  // Transitions the offer operation, and recovers resources if the
+  // offer operation becomes terminal.
+  void updateOfferOperation(
+      OfferOperation* operation,
+      OfferOperationStatusUpdate update);
+
   // Attempts to update the allocator by applying the given operation.
   // If successful, updates the slave's resources, sends a
   // 'CheckpointResourcesMessage' to the slave with the updated
@@ -2731,7 +2750,7 @@ struct Framework
     }
   }
 
-  void addOfferOperation(process::Owned<OfferOperation> operation)
+  void addOfferOperation(OfferOperation* operation)
   {
     Try<UUID> uuid = UUID::fromBytes(operation->operation_uuid());
     CHECK_SOME(uuid);
@@ -3000,7 +3019,7 @@ struct Framework
 
   // Pending operations or terminal operations that have
   // unacknowledged status updates.
-  hashmap<UUID, process::Owned<OfferOperation>> offerOperations;
+  hashmap<UUID, OfferOperation*> offerOperations;
 
   // The map from the framework-specified operation ID to the
   // corresponding internal operation UUID.


[03/14] mesos git commit: Changed slave_id field to be optional in OfferOperationStatusUpdate.

Posted by ji...@apache.org.
Changed slave_id field to be optional in OfferOperationStatusUpdate.

We plan to reuse this message for communication between resource
provider manager in the master to master as well.

Review: https://reviews.apache.org/r/63484


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8f727c91
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8f727c91
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8f727c91

Branch: refs/heads/master
Commit: 8f727c91aeb3f303b3bcfd496d3a7bfe2e7b050c
Parents: 59646a6
Author: Jie Yu <yu...@gmail.com>
Authored: Wed Nov 1 12:40:20 2017 +0100
Committer: Jie Yu <yu...@gmail.com>
Committed: Mon Nov 6 14:37:26 2017 -0800

----------------------------------------------------------------------
 src/messages/messages.proto | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/8f727c91/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index 1610c2b..3244c1f 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -686,14 +686,16 @@ message UpdateSlaveMessage {
 
 
 /**
- * This message is sent from the agent to the master to update the
- * status of an offer operation.
+ * This message is sent from the resource provider manager (either on
+ * the agent for local resource providers, or on the master for
+ * external resource providers) to update the status of an offer
+ * operation.
  *
  * See resource_provider::Call::UPDATE_OFFER_OPERATION_STATUS.
  */
 message OfferOperationStatusUpdate {
   required FrameworkID framework_id = 1;
-  required SlaveID slave_id = 2;
+  optional SlaveID slave_id = 2;
   required OfferOperationStatus status = 3;
   optional OfferOperationStatus latest_status = 4;
 


[14/14] mesos git commit: Added agent ID to offer operation protobuf helper.

Posted by ji...@apache.org.
Added agent ID to offer operation protobuf helper.

The current code expects an agent ID to be set for every instance of
'OfferOperation'. The absence of an agent ID would indicate an operation
meant for an external resource provider which isn't supported yet.

Review: https://reviews.apache.org/r/63584/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2619824c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2619824c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2619824c

Branch: refs/heads/master
Commit: 2619824c9c4157b254157eff76c98fce63d36e71
Parents: 0a30e9e
Author: Jan Schlicht <ja...@mesosphere.io>
Authored: Mon Nov 6 14:38:02 2017 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Mon Nov 6 14:49:07 2017 -0800

----------------------------------------------------------------------
 src/common/protobuf_utils.cpp |  4 +++-
 src/common/protobuf_utils.hpp |  3 ++-
 src/master/master.cpp         | 15 ++++++++++-----
 3 files changed, 15 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/2619824c/src/common/protobuf_utils.cpp
----------------------------------------------------------------------
diff --git a/src/common/protobuf_utils.cpp b/src/common/protobuf_utils.cpp
index de18cea..7a4b87b 100644
--- a/src/common/protobuf_utils.cpp
+++ b/src/common/protobuf_utils.cpp
@@ -443,10 +443,12 @@ OfferOperationStatus createOfferOperationStatus(
 OfferOperation createOfferOperation(
     const Offer::Operation& info,
     const OfferOperationStatus& latestStatus,
-    const FrameworkID& frameworkId)
+    const FrameworkID& frameworkId,
+    const SlaveID& slaveId)
 {
   OfferOperation operation;
   operation.mutable_framework_id()->CopyFrom(frameworkId);
+  operation.mutable_slave_id()->CopyFrom(slaveId);
   operation.mutable_info()->CopyFrom(info);
   operation.mutable_latest_status()->CopyFrom(latestStatus);
   operation.set_operation_uuid(UUID::random().toBytes());

http://git-wip-us.apache.org/repos/asf/mesos/blob/2619824c/src/common/protobuf_utils.hpp
----------------------------------------------------------------------
diff --git a/src/common/protobuf_utils.hpp b/src/common/protobuf_utils.hpp
index 5e476be..95f57da 100644
--- a/src/common/protobuf_utils.hpp
+++ b/src/common/protobuf_utils.hpp
@@ -163,7 +163,8 @@ OfferOperationStatus createOfferOperationStatus(
 OfferOperation createOfferOperation(
     const Offer::Operation& info,
     const OfferOperationStatus& latestStatus,
-    const FrameworkID& frameworkId);
+    const FrameworkID& frameworkId,
+    const SlaveID& slaveId);
 
 
 // Helper function that creates a MasterInfo from UPID.

http://git-wip-us.apache.org/repos/asf/mesos/blob/2619824c/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index c7aadb1..50f2592 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -5210,7 +5210,8 @@ void Master::_accept(
             protobuf::createOfferOperation(
                 operation,
                 protobuf::createOfferOperationStatus(OFFER_OPERATION_PENDING),
-                frameworkId));
+                frameworkId,
+                slaveId));
 
         addOfferOperation(framework, slave, offerOperation);
 
@@ -5253,7 +5254,8 @@ void Master::_accept(
             protobuf::createOfferOperation(
                 operation,
                 protobuf::createOfferOperationStatus(OFFER_OPERATION_PENDING),
-                frameworkId));
+                frameworkId,
+                slaveId));
 
         addOfferOperation(framework, slave, offerOperation);
 
@@ -5296,7 +5298,8 @@ void Master::_accept(
             protobuf::createOfferOperation(
                 operation,
                 protobuf::createOfferOperationStatus(OFFER_OPERATION_PENDING),
-                frameworkId));
+                frameworkId,
+                slaveId));
 
         addOfferOperation(framework, slave, offerOperation);
 
@@ -5339,7 +5342,8 @@ void Master::_accept(
             protobuf::createOfferOperation(
                 operation,
                 protobuf::createOfferOperationStatus(OFFER_OPERATION_PENDING),
-                frameworkId));
+                frameworkId,
+                slaveId));
 
         addOfferOperation(framework, slave, offerOperation);
 
@@ -9868,7 +9872,8 @@ void Master::_apply(
         protobuf::createOfferOperation(
             operation,
             protobuf::createOfferOperationStatus(OFFER_OPERATION_PENDING),
-            framework->id()));
+            framework->id(),
+            slave->id));
 
     addOfferOperation(framework, slave, offerOperation);
 


[07/14] mesos git commit: Updated the comment about slaves.removed in master.

Posted by ji...@apache.org.
Updated the comment about slaves.removed in master.

Review: https://reviews.apache.org/r/63432


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/eb5e65cf
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/eb5e65cf
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/eb5e65cf

Branch: refs/heads/master
Commit: eb5e65cf693d9b5bee9425b435b7b189d0bfc6f1
Parents: 97062ac
Author: Jie Yu <yu...@gmail.com>
Authored: Mon Oct 30 23:46:56 2017 +0100
Committer: Jie Yu <yu...@gmail.com>
Committed: Mon Nov 6 14:37:26 2017 -0800

----------------------------------------------------------------------
 src/master/master.hpp | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/eb5e65cf/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index afcc2e4..52f5576 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -1855,8 +1855,9 @@ private:
     hashset<SlaveID> markingGone;
 
     // This collection includes agents that have gracefully shutdown,
-    // as well as those that have been marked unreachable. We keep a
-    // cache here to prevent this from growing in an unbounded manner.
+    // as well as those that have been marked unreachable or gone. We
+    // keep a cache here to prevent this from growing in an unbounded
+    // manner.
     //
     // TODO(bmahler): Ideally we could use a cache with set semantics.
     //


[13/14] mesos git commit: Introduced ResourceConversion to represent conversion to Resources.

Posted by ji...@apache.org.
Introduced ResourceConversion to represent conversion to Resources.

Currently, we couple offer operations with resource conversions. For
instance, we have interfaces like `Resources::apply` that takes an
offer operation. This becomes less ideal because an offer operation
might not represent a `conversion` anymore with new offer operation
like `CREATE_VOLUME` being introduced.

This patch decoupled resource conversions from offer operations.
`Resources::apply` will now take a `ResourceConversion` object. We
also provide some helpers to get a `ResourceConversion` from an offer
operation (if supported).

Review: https://reviews.apache.org/r/63511


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e55309ad
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e55309ad
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e55309ad

Branch: refs/heads/master
Commit: e55309ad8274164e23782a5cb53de93ffa0bc24e
Parents: 2f1efb9
Author: Jie Yu <yu...@gmail.com>
Authored: Thu Nov 2 14:19:19 2017 +0100
Committer: Jie Yu <yu...@gmail.com>
Committed: Mon Nov 6 14:37:26 2017 -0800

----------------------------------------------------------------------
 include/mesos/resources.hpp    |  60 +++++--
 include/mesos/v1/resources.hpp |  60 +++++--
 src/common/resources.cpp       | 306 ++++++------------------------------
 src/common/resources_utils.cpp | 131 +++++++++++++++
 src/common/resources_utils.hpp |  17 ++
 src/v1/resources.cpp           | 306 ++++++------------------------------
 6 files changed, 332 insertions(+), 548 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/e55309ad/include/mesos/resources.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/resources.hpp b/include/mesos/resources.hpp
index 53152b3..08c544d 100644
--- a/include/mesos/resources.hpp
+++ b/include/mesos/resources.hpp
@@ -36,6 +36,7 @@
 #include <stout/hashmap.hpp>
 #include <stout/json.hpp>
 #include <stout/lambda.hpp>
+#include <stout/nothing.hpp>
 #include <stout/option.hpp>
 #include <stout/try.hpp>
 
@@ -53,6 +54,10 @@
 
 namespace mesos {
 
+// Forward declaration.
+class ResourceConversion;
+
+
 // NOTE: Resource objects stored in the class are always valid, are in
 // the "post-reservation-refinement" format, and kept combined if possible.
 // It is the caller's responsibility to validate any Resource object or
@@ -464,26 +469,30 @@ public:
   // example frameworks to leverage.
   Option<Resources> find(const Resources& targets) const;
 
-  // Certain offer operations alter the offered resources. The
-  // following methods provide a convenient way to get the transformed
-  // resources by applying the given offer operation(s). Returns an
-  // Error if the offer operation(s) cannot be applied.
-  Try<Resources> apply(
-      const Offer::Operation& operation,
-      const Option<Resources>& convertedResources = None()) const;
+  // Applies a resource conversion by taking out the `consumed`
+  // resources and adding back the `converted` resources. Returns an
+  // Error if the conversion cannot be applied.
+  Try<Resources> apply(const ResourceConversion& conversion) const;
+
+  // Obtains the conversion from the given operation and applies the
+  // conversion. This method serves a syntax sugar for applying a
+  // resource conversion.
+  // TODO(jieyu): Consider remove this method once we updated all the
+  // call sites.
+  Try<Resources> apply(const Offer::Operation& operation) const;
 
   template <typename Iterable>
-  Try<Resources> apply(const Iterable& operations) const
+  Try<Resources> apply(const Iterable& iterable) const
   {
     Resources result = *this;
 
-    foreach (const Offer::Operation& operation, operations) {
-      Try<Resources> transformed = result.apply(operation);
-      if (transformed.isError()) {
-        return Error(transformed.error());
+    foreach (const auto& t, iterable) {
+      Try<Resources> converted = result.apply(t);
+      if (converted.isError()) {
+        return Error(converted.error());
       }
 
-      result = transformed.get();
+      result = converted.get();
     }
 
     return result;
@@ -663,6 +672,31 @@ hashmap<Key, Resources> operator+(
   return result;
 }
 
+
+/**
+ * Represents a resource conversion, usually as a result of an offer
+ * operation. See more details in `Resources::apply` method.
+ */
+class ResourceConversion
+{
+public:
+  typedef lambda::function<Try<Nothing>(const Resources&)> PostValidation;
+
+  ResourceConversion(
+      const Resources& _consumed,
+      const Resources& _converted,
+      const Option<PostValidation>& _postValidation = None())
+    : consumed(_consumed),
+      converted(_converted),
+      postValidation(_postValidation) {}
+
+  Try<Resources> apply(const Resources& resources) const;
+
+  Resources consumed;
+  Resources converted;
+  Option<PostValidation> postValidation;
+};
+
 } // namespace mesos {
 
 #endif // __RESOURCES_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/e55309ad/include/mesos/v1/resources.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/v1/resources.hpp b/include/mesos/v1/resources.hpp
index 6c2191c..d59fa35 100644
--- a/include/mesos/v1/resources.hpp
+++ b/include/mesos/v1/resources.hpp
@@ -35,6 +35,7 @@
 #include <stout/hashmap.hpp>
 #include <stout/json.hpp>
 #include <stout/lambda.hpp>
+#include <stout/nothing.hpp>
 #include <stout/option.hpp>
 #include <stout/try.hpp>
 
@@ -53,6 +54,10 @@
 namespace mesos {
 namespace v1 {
 
+// Forward declaration.
+class ResourceConversion;
+
+
 // NOTE: Resource objects stored in the class are always valid, are in
 // the "post-reservation-refinement" format, and kept combined if possible.
 // It is the caller's responsibility to validate any Resource object or
@@ -464,26 +469,30 @@ public:
   // example frameworks to leverage.
   Option<Resources> find(const Resources& targets) const;
 
-  // Certain offer operations alter the offered resources. The
-  // following methods provide a convenient way to get the transformed
-  // resources by applying the given offer operation(s). Returns an
-  // Error if the offer operation(s) cannot be applied.
-  Try<Resources> apply(
-      const Offer::Operation& operation,
-      const Option<Resources>& convertedResources = None()) const;
+  // Applies a resource conversion by taking out the `consumed`
+  // resources and adding back the `converted` resources. Returns an
+  // Error if the conversion cannot be applied.
+  Try<Resources> apply(const ResourceConversion& conversion) const;
+
+  // Obtains the conversion from the given operation and applies the
+  // conversion. This method serves a syntax sugar for applying a
+  // resource conversion.
+  // TODO(jieyu): Consider remove this method once we updated all the
+  // call sites.
+  Try<Resources> apply(const Offer::Operation& operation) const;
 
   template <typename Iterable>
-  Try<Resources> apply(const Iterable& operations) const
+  Try<Resources> apply(const Iterable& iterable) const
   {
     Resources result = *this;
 
-    foreach (const Offer::Operation& operation, operations) {
-      Try<Resources> transformed = result.apply(operation);
-      if (transformed.isError()) {
-        return Error(transformed.error());
+    foreach (const auto& t, iterable) {
+      Try<Resources> converted = result.apply(t);
+      if (converted.isError()) {
+        return Error(converted.error());
       }
 
-      result = transformed.get();
+      result = converted.get();
     }
 
     return result;
@@ -663,6 +672,31 @@ hashmap<Key, Resources> operator+(
   return result;
 }
 
+
+/**
+ * Represents a resource conversion, usually as a result of an offer
+ * operation. See more details in `Resources::apply` method.
+ */
+class ResourceConversion
+{
+public:
+  typedef lambda::function<Try<Nothing>(const Resources&)> PostValidation;
+
+  ResourceConversion(
+      const Resources& _consumed,
+      const Resources& _converted,
+      const Option<PostValidation>& _postValidation = None())
+    : consumed(_consumed),
+      converted(_converted),
+      postValidation(_postValidation) {}
+
+  Try<Resources> apply(const Resources& resources) const;
+
+  Resources consumed;
+  Resources converted;
+  Option<PostValidation> postValidation;
+};
+
 } // namespace v1 {
 } // namespace mesos {
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/e55309ad/src/common/resources.cpp
----------------------------------------------------------------------
diff --git a/src/common/resources.cpp b/src/common/resources.cpp
index 914d3e2..4ccfdc1 100644
--- a/src/common/resources.cpp
+++ b/src/common/resources.cpp
@@ -1623,277 +1623,35 @@ Option<Resources> Resources::find(const Resources& targets) const
 }
 
 
-Try<Resources> Resources::apply(
-    const Offer::Operation& operation,
-    const Option<Resources>& convertedResources) const
+Try<Resources> Resources::apply(const ResourceConversion& conversion) const
 {
-  Resources result = *this;
-
-  switch (operation.type()) {
-    case Offer::Operation::LAUNCH:
-      return Error("Cannot apply LAUNCH Operation");
-
-    case Offer::Operation::LAUNCH_GROUP:
-      return Error("Cannot apply LAUNCH_GROUP Operation");
-
-    case Offer::Operation::RESERVE: {
-      if (convertedResources.isSome()) {
-        return Error(
-            "Converted resources not expected for RESERVE Operation");
-      }
-
-      Option<Error> error = validate(operation.reserve().resources());
-      if (error.isSome()) {
-        return Error("Invalid RESERVE Operation: " + error->message);
-      }
-
-      foreach (const Resource& reserved, operation.reserve().resources()) {
-        if (!Resources::isReserved(reserved)) {
-          return Error("Invalid RESERVE Operation: Resource must be reserved");
-        } else if (!Resources::isDynamicallyReserved(reserved)) {
-          return Error(
-              "Invalid RESERVE Operation: Resource must be"
-              " dynamically reserved");
-        }
-
-        // Note that we only allow "pushing" a single reservation at time.
-        Resources resources = Resources(reserved).popReservation();
-
-        if (!result.contains(resources)) {
-          return Error("Invalid RESERVE Operation: " + stringify(result) +
-                       " does not contain " + stringify(resources));
-        }
-
-        result -= resources;
-        result.add(reserved);
-      }
-      break;
-    }
-
-    case Offer::Operation::UNRESERVE: {
-      if (convertedResources.isSome()) {
-        return Error(
-            "Converted resources not expected for UNRESERVE Operation");
-      }
-
-      Option<Error> error = validate(operation.unreserve().resources());
-      if (error.isSome()) {
-        return Error("Invalid UNRESERVE Operation: " + error->message);
-      }
-
-      foreach (const Resource& reserved, operation.unreserve().resources()) {
-        if (!Resources::isReserved(reserved)) {
-          return Error("Invalid UNRESERVE Operation: Resource is not reserved");
-        } else if (!Resources::isDynamicallyReserved(reserved)) {
-          return Error(
-              "Invalid UNRESERVE Operation: Resource is not"
-              " dynamically reserved");
-        }
-
-        if (!result.contains(reserved)) {
-          return Error("Invalid UNRESERVE Operation: " + stringify(result) +
-                       " does not contain " + stringify(reserved));
-        }
-
-        // Note that we only allow "popping" a single reservation at time.
-        Resources resources = Resources(reserved).popReservation();
-
-        result.subtract(reserved);
-        result += resources;
-      }
-      break;
-    }
-
-    case Offer::Operation::CREATE: {
-      if (convertedResources.isSome()) {
-        return Error(
-            "Converted resources not expected for CREATE Operation");
-      }
-
-      Option<Error> error = validate(operation.create().volumes());
-      if (error.isSome()) {
-        return Error("Invalid CREATE Operation: " + error->message);
-      }
-
-      foreach (const Resource& volume, operation.create().volumes()) {
-        if (!volume.has_disk()) {
-          return Error("Invalid CREATE Operation: Missing 'disk'");
-        } else if (!volume.disk().has_persistence()) {
-          return Error("Invalid CREATE Operation: Missing 'persistence'");
-        }
-
-        // Strip persistence and volume from the disk info so that we
-        // can subtract it from the original resources.
-        // TODO(jieyu): Non-persistent volumes are not supported for
-        // now. Persistent volumes can only be be created from regular
-        // disk resources. Revisit this once we start to support
-        // non-persistent volumes.
-        Resource stripped = volume;
-
-        if (stripped.disk().has_source()) {
-          stripped.mutable_disk()->clear_persistence();
-          stripped.mutable_disk()->clear_volume();
-        } else {
-          stripped.clear_disk();
-        }
-
-        // Since we only allow persistent volumes to be shared, the
-        // original resource must be non-shared.
-        stripped.clear_shared();
-
-        if (!result.contains(stripped)) {
-          return Error("Invalid CREATE Operation: Insufficient disk resources"
-                       " for persistent volume " + stringify(volume));
-        }
-
-        result.subtract(stripped);
-        result.add(volume);
-      }
-      break;
-    }
-
-    case Offer::Operation::DESTROY: {
-      if (convertedResources.isSome()) {
-        return Error(
-            "Converted resources not expected for DESTROY Operation");
-      }
-
-      Option<Error> error = validate(operation.destroy().volumes());
-      if (error.isSome()) {
-        return Error("Invalid DESTROY Operation: " + error->message);
-      }
-
-      foreach (const Resource& volume, operation.destroy().volumes()) {
-        if (!volume.has_disk()) {
-          return Error("Invalid DESTROY Operation: Missing 'disk'");
-        } else if (!volume.disk().has_persistence()) {
-          return Error("Invalid DESTROY Operation: Missing 'persistence'");
-        }
-
-        if (!result.contains(volume)) {
-          return Error(
-              "Invalid DESTROY Operation: Persistent volume does not exist");
-        }
-
-        result.subtract(volume);
-
-        if (result.contains(volume)) {
-          return Error(
-              "Invalid DESTROY Operation: Persistent volume " +
-              stringify(volume) + " cannot be removed due to additional " +
-              "shared copies");
-        }
-
-        // Strip persistence and volume from the disk info so that we
-        // can subtract it from the original resources.
-        Resource stripped = volume;
-
-        if (stripped.disk().has_source()) {
-          stripped.mutable_disk()->clear_persistence();
-          stripped.mutable_disk()->clear_volume();
-        } else {
-          stripped.clear_disk();
-        }
-
-        // Since we only allow persistent volumes to be shared, we
-        // return the resource to non-shared state after destroy.
-        stripped.clear_shared();
-
-        result.add(stripped);
-      }
-      break;
-    }
-
-    case Offer::Operation::CREATE_VOLUME: {
-      if (convertedResources.isNone()) {
-        return Error(
-            "Converted resources not specified for CREATE_VOLUME Operation");
-      }
-
-      const Resource& consumed = operation.create_volume().source();
-
-      if (!result.contains(consumed)) {
-        return Error(
-            "Invalid CREATE_VOLUME Operation: " + stringify(result) +
-            " does not contain " + stringify(consumed));
-      }
-
-      result.subtract(consumed);
-      result += convertedResources.get();
-      break;
-    }
-
-    case Offer::Operation::DESTROY_VOLUME: {
-      if (convertedResources.isNone()) {
-        return Error(
-            "Converted resources not specified for DESTROY_VOLUME Operation");
-      }
-
-      const Resource& consumed = operation.destroy_volume().volume();
-
-      if (!result.contains(consumed)) {
-        return Error(
-            "Invalid DESTROY_VOLUME Operation: " + stringify(result) +
-            " does not contain " + stringify(consumed));
-      }
-
-      result.subtract(consumed);
-      result += convertedResources.get();
-      break;
-    }
-
-    case Offer::Operation::CREATE_BLOCK: {
-      if (convertedResources.isNone()) {
-        return Error(
-            "Converted resources not specified for CREATE_BLOCK Operation");
-      }
-
-      const Resource& consumed = operation.create_block().source();
-
-      if (!result.contains(consumed)) {
-        return Error(
-            "Invalid CREATE_BLOCK Operation: " + stringify(result) +
-            " does not contain " + stringify(consumed));
-      }
-
-      result.subtract(consumed);
-      result += convertedResources.get();
-      break;
-    }
-
-    case Offer::Operation::DESTROY_BLOCK: {
-      if (convertedResources.isNone()) {
-        return Error(
-            "Converted resources not specified for DESTROY_BLOCK Operation");
-      }
+  return conversion.apply(*this);
+}
 
-      const Resource& consumed = operation.destroy_block().block();
 
-      if (!result.contains(consumed)) {
-        return Error(
-            "Invalid DESTROY_BLOCK Operation: " + stringify(result) +
-            " does not contain " + stringify(consumed));
-      }
+Try<Resources> Resources::apply(const Offer::Operation& operation) const
+{
+  Try<vector<ResourceConversion>> conversions =
+    getResourceConversions(operation);
 
-      result.subtract(consumed);
-      result += convertedResources.get();
-      break;
-    }
+  if (conversions.isError()) {
+    return Error("Cannot get conversions: " + conversions.error());
+  }
 
-    case Offer::Operation::UNKNOWN:
-      return Error("Unknown offer operation");
+  Try<Resources> result = apply(conversions.get());
+  if (result.isError()) {
+    return Error(result.error());
   }
 
-  // The following are sanity checks to ensure the amount of each type of
-  // resource does not change.
+  // The following are sanity checks to ensure the amount of each type
+  // of resource does not change.
   // TODO(jieyu): Currently, we only check known resource types like
   // cpus, gpus, mem, disk, ports, etc. We should generalize this.
-
-  CHECK(result.cpus() == cpus());
-  CHECK(result.gpus() == gpus());
-  CHECK(result.mem() == mem());
-  CHECK(result.disk() == disk());
-  CHECK(result.ports() == ports());
+  CHECK(result->cpus() == cpus());
+  CHECK(result->gpus() == gpus());
+  CHECK(result->mem() == mem());
+  CHECK(result->disk() == disk());
+  CHECK(result->ports() == ports());
 
   return result;
 }
@@ -2525,4 +2283,28 @@ ostream& operator<<(
   return stream << JSON::protobuf(resources);
 }
 
+
+Try<Resources> ResourceConversion::apply(const Resources& resources) const
+{
+  Resources result = resources;
+
+  if (!result.contains(consumed)) {
+    return Error(
+        stringify(result) + " does not contain " +
+        stringify(consumed));
+  }
+
+  result -= consumed;
+  result += converted;
+
+  if (postValidation.isSome()) {
+    Try<Nothing> validation = postValidation.get()(result);
+    if (validation.isError()) {
+      return Error(validation.error());
+    }
+  }
+
+  return result;
+}
+
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/e55309ad/src/common/resources_utils.cpp
----------------------------------------------------------------------
diff --git a/src/common/resources_utils.cpp b/src/common/resources_utils.cpp
index 8304da4..7c48704 100644
--- a/src/common/resources_utils.cpp
+++ b/src/common/resources_utils.cpp
@@ -19,6 +19,8 @@
 
 #include "common/resources_utils.hpp"
 
+using std::vector;
+
 using google::protobuf::RepeatedPtrField;
 
 namespace mesos {
@@ -91,6 +93,135 @@ Try<Resources> applyCheckpointedResources(
 }
 
 
+namespace internal {
+
+// NOTE: Use template here so that it works for both internal and v1.
+template <typename TResources,
+          typename TResource,
+          typename TResourceConversion,
+          typename TOfferOperation>
+Try<vector<TResourceConversion>> getResourceConversions(
+    const TOfferOperation& operation)
+{
+  vector<TResourceConversion> conversions;
+
+  switch (operation.type()) {
+    case TOfferOperation::UNKNOWN:
+      return Error("Unknown offer operation");
+
+    case TOfferOperation::LAUNCH:
+    case TOfferOperation::LAUNCH_GROUP:
+    case TOfferOperation::CREATE_VOLUME:
+    case TOfferOperation::DESTROY_VOLUME:
+    case TOfferOperation::CREATE_BLOCK:
+    case TOfferOperation::DESTROY_BLOCK:
+      return Error("Offer operation not supported");
+
+    case TOfferOperation::RESERVE: {
+      foreach (const TResource& reserved, operation.reserve().resources()) {
+        // Note that we only allow "pushing" a single reservation at time.
+        TResources consumed = TResources(reserved).popReservation();
+        conversions.emplace_back(consumed, reserved);
+      }
+      break;
+    }
+
+    case TOfferOperation::UNRESERVE: {
+      foreach (const TResource& reserved, operation.unreserve().resources()) {
+        // Note that we only allow "popping" a single reservation at time.
+        TResources converted = TResources(reserved).popReservation();
+        conversions.emplace_back(reserved, converted);
+      }
+      break;
+    }
+
+    case TOfferOperation::CREATE: {
+      foreach (const TResource& volume, operation.create().volumes()) {
+        // Strip persistence and volume from the disk info so that we
+        // can subtract it from the original resources.
+        // TODO(jieyu): Non-persistent volumes are not supported for
+        // now. Persistent volumes can only be be created from regular
+        // disk resources. Revisit this once we start to support
+        // non-persistent volumes.
+        TResource stripped = volume;
+
+        if (stripped.disk().has_source()) {
+          stripped.mutable_disk()->clear_persistence();
+          stripped.mutable_disk()->clear_volume();
+        } else {
+          stripped.clear_disk();
+        }
+
+        // Since we only allow persistent volumes to be shared, the
+        // original resource must be non-shared.
+        stripped.clear_shared();
+
+        conversions.emplace_back(stripped, volume);
+      }
+      break;
+    }
+
+    case TOfferOperation::DESTROY: {
+      foreach (const TResource& volume, operation.destroy().volumes()) {
+        // Strip persistence and volume from the disk info so that we
+        // can subtract it from the original resources.
+        TResource stripped = volume;
+
+        if (stripped.disk().has_source()) {
+          stripped.mutable_disk()->clear_persistence();
+          stripped.mutable_disk()->clear_volume();
+        } else {
+          stripped.clear_disk();
+        }
+
+        // Since we only allow persistent volumes to be shared, we
+        // return the resource to non-shared state after destroy.
+        stripped.clear_shared();
+
+        conversions.emplace_back(
+            volume,
+            stripped,
+            [volume](const TResources& resources) -> Try<Nothing> {
+              if (resources.contains(volume)) {
+                return Error(
+                  "Persistent volume " + stringify(volume) + " cannot be "
+                  "removed due to additional shared copies");
+              }
+              return Nothing();
+            });
+      }
+      break;
+    }
+  }
+
+  return conversions;
+}
+
+} // namespace internal {
+
+
+Try<vector<ResourceConversion>> getResourceConversions(
+    const Offer::Operation& operation)
+{
+  return internal::getResourceConversions<
+      Resources,
+      Resource,
+      ResourceConversion,
+      Offer::Operation>(operation);
+}
+
+
+Try<vector<v1::ResourceConversion>> getResourceConversions(
+    const v1::Offer::Operation& operation)
+{
+  return internal::getResourceConversions<
+      v1::Resources,
+      v1::Resource,
+      v1::ResourceConversion,
+      v1::Offer::Operation>(operation);
+}
+
+
 Result<ResourceProviderID> getResourceProviderId(
     const Offer::Operation& operation)
 {

http://git-wip-us.apache.org/repos/asf/mesos/blob/e55309ad/src/common/resources_utils.hpp
----------------------------------------------------------------------
diff --git a/src/common/resources_utils.hpp b/src/common/resources_utils.hpp
index b8c29b3..5b74ff2 100644
--- a/src/common/resources_utils.hpp
+++ b/src/common/resources_utils.hpp
@@ -24,6 +24,9 @@
 #include <mesos/mesos.hpp>
 #include <mesos/resources.hpp>
 
+#include <mesos/v1/mesos.hpp>
+#include <mesos/v1/resources.hpp>
+
 #include <stout/error.hpp>
 #include <stout/nothing.hpp>
 #include <stout/option.hpp>
@@ -55,6 +58,20 @@ Result<ResourceProviderID> getResourceProviderId(
     const Offer::Operation& operation);
 
 
+// Returns the resource conversions from the given offer operation.
+// This helper assumes that the given operation has already been
+// validated.
+Try<std::vector<ResourceConversion>> getResourceConversions(
+    const Offer::Operation& operation);
+
+
+// Returns the resource conversions from the given offer operation.
+// This helper assumes that the given operation has already been
+// validated.
+Try<std::vector<v1::ResourceConversion>> getResourceConversions(
+    const v1::Offer::Operation& operation);
+
+
 // Resource format options to be used with the `convertResourceFormat` function.
 //
 // The preconditions of the options are asymmetric, centered around the

http://git-wip-us.apache.org/repos/asf/mesos/blob/e55309ad/src/v1/resources.cpp
----------------------------------------------------------------------
diff --git a/src/v1/resources.cpp b/src/v1/resources.cpp
index 58568b8..43d9b0f 100644
--- a/src/v1/resources.cpp
+++ b/src/v1/resources.cpp
@@ -39,6 +39,8 @@
 #include <stout/strings.hpp>
 #include <stout/unreachable.hpp>
 
+#include "common/resources_utils.hpp"
+
 using std::map;
 using std::ostream;
 using std::set;
@@ -1654,275 +1656,35 @@ Option<Resources> Resources::find(const Resources& targets) const
 }
 
 
-Try<Resources> Resources::apply(
-    const Offer::Operation& operation,
-    const Option<Resources>& convertedResources) const
+Try<Resources> Resources::apply(const ResourceConversion& conversion) const
 {
-  Resources result = *this;
-
-  switch (operation.type()) {
-    case Offer::Operation::LAUNCH:
-      return Error("Cannot apply LAUNCH Operation");
-
-    case Offer::Operation::LAUNCH_GROUP:
-      return Error("Cannot apply LAUNCH_GROUP Operation");
-
-    case Offer::Operation::RESERVE: {
-      if (convertedResources.isSome()) {
-        return Error(
-            "Converted resources not expected for RESERVE Operation");
-      }
-
-      Option<Error> error = validate(operation.reserve().resources());
-      if (error.isSome()) {
-        return Error("Invalid RESERVE Operation: " + error->message);
-      }
-
-      foreach (const Resource& reserved, operation.reserve().resources()) {
-        if (!Resources::isReserved(reserved)) {
-          return Error("Invalid RESERVE Operation: Resource must be reserved");
-        } else if (!Resources::isDynamicallyReserved(reserved)) {
-          return Error(
-              "Invalid RESERVE Operation: Resource must be"
-              " dynamically reserved");
-        }
-
-        Resources resources = Resources(reserved).popReservation();
-
-        if (!result.contains(resources)) {
-          return Error("Invalid RESERVE Operation: " + stringify(result) +
-                       " does not contain " + stringify(resources));
-        }
-
-        result -= resources;
-        result.add(reserved);
-      }
-      break;
-    }
-
-    case Offer::Operation::UNRESERVE: {
-      if (convertedResources.isSome()) {
-        return Error(
-            "Converted resources not expected for UNRESERVE Operation");
-      }
-
-      Option<Error> error = validate(operation.unreserve().resources());
-      if (error.isSome()) {
-        return Error("Invalid UNRESERVE Operation: " + error->message);
-      }
-
-      foreach (const Resource& reserved, operation.unreserve().resources()) {
-        if (!Resources::isReserved(reserved)) {
-          return Error("Invalid UNRESERVE Operation: Resource is not reserved");
-        } else if (!Resources::isDynamicallyReserved(reserved)) {
-          return Error(
-              "Invalid UNRESERVE Operation: Resource is not"
-              " dynamically reserved");
-        }
-
-        if (!result.contains(reserved)) {
-          return Error("Invalid UNRESERVE Operation: " + stringify(result) +
-                       " does not contain " + stringify(reserved));
-        }
-
-        Resources resources = Resources(reserved).popReservation();
-
-        result.subtract(reserved);
-        result += resources;
-      }
-      break;
-    }
-
-    case Offer::Operation::CREATE: {
-      if (convertedResources.isSome()) {
-        return Error(
-            "Converted resources not expected for CREATE Operation");
-      }
-
-      Option<Error> error = validate(operation.create().volumes());
-      if (error.isSome()) {
-        return Error("Invalid CREATE Operation: " + error->message);
-      }
-
-      foreach (const Resource& volume, operation.create().volumes()) {
-        if (!volume.has_disk()) {
-          return Error("Invalid CREATE Operation: Missing 'disk'");
-        } else if (!volume.disk().has_persistence()) {
-          return Error("Invalid CREATE Operation: Missing 'persistence'");
-        }
-
-        // Strip persistence and volume from the disk info so that we
-        // can subtract it from the original resources.
-        // TODO(jieyu): Non-persistent volumes are not supported for
-        // now. Persistent volumes can only be be created from regular
-        // disk resources. Revisit this once we start to support
-        // non-persistent volumes.
-        Resource stripped = volume;
-
-        if (stripped.disk().has_source()) {
-          stripped.mutable_disk()->clear_persistence();
-          stripped.mutable_disk()->clear_volume();
-        } else {
-          stripped.clear_disk();
-        }
-
-        // Since we only allow persistent volumes to be shared, the
-        // original resource must be non-shared.
-        stripped.clear_shared();
-
-        if (!result.contains(stripped)) {
-          return Error("Invalid CREATE Operation: Insufficient disk resources"
-                       " for persistent volume " + stringify(volume));
-        }
-
-        result.subtract(stripped);
-        result.add(volume);
-      }
-      break;
-    }
-
-    case Offer::Operation::DESTROY: {
-      if (convertedResources.isSome()) {
-        return Error(
-            "Converted resources not expected for DESTROY Operation");
-      }
-
-      Option<Error> error = validate(operation.destroy().volumes());
-      if (error.isSome()) {
-        return Error("Invalid DESTROY Operation: " + error->message);
-      }
-
-      foreach (const Resource& volume, operation.destroy().volumes()) {
-        if (!volume.has_disk()) {
-          return Error("Invalid DESTROY Operation: Missing 'disk'");
-        } else if (!volume.disk().has_persistence()) {
-          return Error("Invalid DESTROY Operation: Missing 'persistence'");
-        }
-
-        if (!result.contains(volume)) {
-          return Error(
-              "Invalid DESTROY Operation: Persistent volume does not exist");
-        }
-
-        result.subtract(volume);
-
-        if (result.contains(volume)) {
-          return Error(
-              "Invalid DESTROY Operation: Persistent volume " +
-              stringify(volume) + " cannot be removed due to additional " +
-              "shared copies");
-        }
-
-        // Strip persistence and volume from the disk info so that we
-        // can subtract it from the original resources.
-        Resource stripped = volume;
-
-        if (stripped.disk().has_source()) {
-          stripped.mutable_disk()->clear_persistence();
-          stripped.mutable_disk()->clear_volume();
-        } else {
-          stripped.clear_disk();
-        }
-
-        // Since we only allow persistent volumes to be shared, we
-        // return the resource to non-shared state after destroy.
-        stripped.clear_shared();
-
-        result.add(stripped);
-      }
-      break;
-    }
-
-    case Offer::Operation::CREATE_VOLUME: {
-      if (convertedResources.isNone()) {
-        return Error(
-            "Converted resources not specified for CREATE_VOLUME Operation");
-      }
-
-      const Resource& consumed = operation.create_volume().source();
-
-      if (!result.contains(consumed)) {
-        return Error(
-            "Invalid CREATE_VOLUME Operation: " + stringify(result) +
-            " does not contain " + stringify(consumed));
-      }
-
-      result.subtract(consumed);
-      result += convertedResources.get();
-      break;
-    }
-
-    case Offer::Operation::DESTROY_VOLUME: {
-      if (convertedResources.isNone()) {
-        return Error(
-            "Converted resources not specified for DESTROY_VOLUME Operation");
-      }
-
-      const Resource& consumed = operation.destroy_volume().volume();
-
-      if (!result.contains(consumed)) {
-        return Error(
-            "Invalid DESTROY_VOLUME Operation: " + stringify(result) +
-            " does not contain " + stringify(consumed));
-      }
-
-      result.subtract(consumed);
-      result += convertedResources.get();
-      break;
-    }
-
-    case Offer::Operation::CREATE_BLOCK: {
-      if (convertedResources.isNone()) {
-        return Error(
-            "Converted resources not specified for CREATE_BLOCK Operation");
-      }
-
-      const Resource& consumed = operation.create_block().source();
-
-      if (!result.contains(consumed)) {
-        return Error(
-            "Invalid CREATE_BLOCK Operation: " + stringify(result) +
-            " does not contain " + stringify(consumed));
-      }
-
-      result.subtract(consumed);
-      result += convertedResources.get();
-      break;
-    }
-
-    case Offer::Operation::DESTROY_BLOCK: {
-      if (convertedResources.isNone()) {
-        return Error(
-            "Converted resources not specified for DESTROY_BLOCK Operation");
-      }
+  return conversion.apply(*this);
+}
 
-      const Resource& consumed = operation.destroy_block().block();
 
-      if (!result.contains(consumed)) {
-        return Error(
-            "Invalid DESTROY_BLOCK Operation: " + stringify(result) +
-            " does not contain " + stringify(consumed));
-      }
+Try<Resources> Resources::apply(const Offer::Operation& operation) const
+{
+  Try<vector<ResourceConversion>> conversions =
+    getResourceConversions(operation);
 
-      result.subtract(consumed);
-      result += convertedResources.get();
-      break;
-    }
+  if (conversions.isError()) {
+    return Error("Cannot get conversions: " + conversions.error());
+  }
 
-    case Offer::Operation::UNKNOWN:
-      return Error("Unknown offer operation");
+  Try<Resources> result = apply(conversions.get());
+  if (result.isError()) {
+    return Error(result.error());
   }
 
-  // The following are sanity checks to ensure the amount of each type of
-  // resource does not change.
+  // The following are sanity checks to ensure the amount of each type
+  // of resource does not change.
   // TODO(jieyu): Currently, we only check known resource types like
   // cpus, gpus, mem, disk, ports, etc. We should generalize this.
-
-  CHECK(result.cpus() == cpus());
-  CHECK(result.gpus() == gpus());
-  CHECK(result.mem() == mem());
-  CHECK(result.disk() == disk());
-  CHECK(result.ports() == ports());
+  CHECK(result->cpus() == cpus());
+  CHECK(result->gpus() == gpus());
+  CHECK(result->mem() == mem());
+  CHECK(result->disk() == disk());
+  CHECK(result->ports() == ports());
 
   return result;
 }
@@ -2554,5 +2316,29 @@ ostream& operator<<(
   return stream << JSON::protobuf(resources);
 }
 
+
+Try<Resources> ResourceConversion::apply(const Resources& resources) const
+{
+  Resources result = resources;
+
+  if (!result.contains(consumed)) {
+    return Error(
+        stringify(result) + " does not contain " +
+        stringify(consumed));
+  }
+
+  result -= consumed;
+  result += converted;
+
+  if (postValidation.isSome()) {
+    Try<Nothing> validation = postValidation.get()(result);
+    if (validation.isError()) {
+      return Error(validation.error());
+    }
+  }
+
+  return result;
+}
+
 } // namespace v1 {
 } // namespace mesos {


[09/14] mesos git commit: Added streaming function for OfferOperationState.

Posted by ji...@apache.org.
Added streaming function for OfferOperationState.

Review: https://reviews.apache.org/r/63482


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/11e36c12
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/11e36c12
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/11e36c12

Branch: refs/heads/master
Commit: 11e36c12e4b5e720a4f5d4ba95260357d7ede950
Parents: c9c2a38
Author: Jie Yu <yu...@gmail.com>
Authored: Wed Nov 1 12:39:11 2017 +0100
Committer: Jie Yu <yu...@gmail.com>
Committed: Mon Nov 6 14:37:26 2017 -0800

----------------------------------------------------------------------
 include/mesos/type_utils.hpp | 5 +++++
 include/mesos/v1/mesos.hpp   | 5 +++++
 src/common/type_utils.cpp    | 6 ++++++
 src/v1/mesos.cpp             | 6 ++++++
 4 files changed, 22 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/11e36c12/include/mesos/type_utils.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/type_utils.hpp b/include/mesos/type_utils.hpp
index 2452396..f7f2327 100644
--- a/include/mesos/type_utils.hpp
+++ b/include/mesos/type_utils.hpp
@@ -376,6 +376,11 @@ std::ostream& operator<<(
     const OfferOperationID& offerOperationId);
 
 
+std::ostream& operator<<(
+    std::ostream& stream,
+    const OfferOperationState& state);
+
+
 std::ostream& operator<<(std::ostream& stream, const RateLimits& limits);
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/11e36c12/include/mesos/v1/mesos.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/v1/mesos.hpp b/include/mesos/v1/mesos.hpp
index 4b53b5c..26b4596 100644
--- a/include/mesos/v1/mesos.hpp
+++ b/include/mesos/v1/mesos.hpp
@@ -364,6 +364,11 @@ std::ostream& operator<<(
     const OfferOperationID& offerOperationId);
 
 
+std::ostream& operator<<(
+    std::ostream& stream,
+    const OfferOperationState& state);
+
+
 std::ostream& operator<<(std::ostream& stream, const RateLimits& limits);
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/11e36c12/src/common/type_utils.cpp
----------------------------------------------------------------------
diff --git a/src/common/type_utils.cpp b/src/common/type_utils.cpp
index b989b4d..1426755 100644
--- a/src/common/type_utils.cpp
+++ b/src/common/type_utils.cpp
@@ -613,6 +613,12 @@ ostream& operator<<(ostream& stream, const OfferOperationID& offerOperationId)
 }
 
 
+ostream& operator<<(ostream& stream, const OfferOperationState& state)
+{
+  return stream << OfferOperationState_Name(state);
+}
+
+
 ostream& operator<<(ostream& stream, const RateLimits& limits)
 {
   return stream << limits.DebugString();

http://git-wip-us.apache.org/repos/asf/mesos/blob/11e36c12/src/v1/mesos.cpp
----------------------------------------------------------------------
diff --git a/src/v1/mesos.cpp b/src/v1/mesos.cpp
index ccb4d2a..44767b1 100644
--- a/src/v1/mesos.cpp
+++ b/src/v1/mesos.cpp
@@ -515,6 +515,12 @@ ostream& operator<<(ostream& stream, const OfferOperationID& offerOperationId)
 }
 
 
+ostream& operator<<(ostream& stream, const OfferOperationState& state)
+{
+  return stream << OfferOperationState_Name(state);
+}
+
+
 ostream& operator<<(ostream& stream, const RateLimits& limits)
 {
   return stream << limits.DebugString();