You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2017/11/06 23:07:00 UTC

[10/14] mesos git commit: Added initial code for offer operation status update in master.

Added initial code for offer operation status update in master.

Review: https://reviews.apache.org/r/63485


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2f1efb99
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2f1efb99
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2f1efb99

Branch: refs/heads/master
Commit: 2f1efb99acccf3b76eb1bc30704aa794545076ff
Parents: 8f727c9
Author: Jie Yu <yu...@gmail.com>
Authored: Wed Nov 1 23:50:21 2017 +0100
Committer: Jie Yu <yu...@gmail.com>
Committed: Mon Nov 6 14:37:26 2017 -0800

----------------------------------------------------------------------
 src/master/master.cpp | 138 +++++++++++++++++++++++++++++++++++++++------
 src/master/master.hpp |  25 +++++++-
 2 files changed, 142 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/2f1efb99/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 9ac1861..831aaac 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -5145,16 +5145,19 @@ void Master::_accept(
                   << operation.create_volume().source() << " from framework "
                   << *framework << " to agent " << *slave;
 
-        Owned<OfferOperation> offerOperation(new OfferOperation(
-            protobuf::createOfferOperation(operation, frameworkId)));
+        OfferOperation* offerOperation = new OfferOperation(
+            protobuf::createOfferOperation(
+                operation,
+                protobuf::createOfferOperationStatus(OFFER_OPERATION_PENDING),
+                frameworkId));
+
+        addOfferOperation(framework, slave, offerOperation);
 
         ApplyOfferOperationMessage message;
         message.mutable_framework_id()->CopyFrom(frameworkId);
         message.mutable_operation_info()->CopyFrom(offerOperation->info());
         message.set_operation_uuid(offerOperation->operation_uuid());
 
-        framework->addOfferOperation(std::move(offerOperation));
-
         send(slave->pid, message);
         break;
       }
@@ -5185,16 +5188,19 @@ void Master::_accept(
                   << operation.destroy_volume().volume() << " from framework "
                   << *framework << " to agent " << *slave;
 
-        Owned<OfferOperation> offerOperation(new OfferOperation(
-            protobuf::createOfferOperation(operation, frameworkId)));
+        OfferOperation* offerOperation = new OfferOperation(
+            protobuf::createOfferOperation(
+                operation,
+                protobuf::createOfferOperationStatus(OFFER_OPERATION_PENDING),
+                frameworkId));
+
+        addOfferOperation(framework, slave, offerOperation);
 
         ApplyOfferOperationMessage message;
         message.mutable_framework_id()->CopyFrom(frameworkId);
         message.mutable_operation_info()->CopyFrom(offerOperation->info());
         message.set_operation_uuid(offerOperation->operation_uuid());
 
-        framework->addOfferOperation(std::move(offerOperation));
-
         send(slave->pid, message);
         break;
       }
@@ -5225,16 +5231,19 @@ void Master::_accept(
                   << operation.create_block().source() << " from framework "
                   << *framework << " to agent " << *slave;
 
-        Owned<OfferOperation> offerOperation(new OfferOperation(
-            protobuf::createOfferOperation(operation, frameworkId)));
+        OfferOperation* offerOperation = new OfferOperation(
+            protobuf::createOfferOperation(
+                operation,
+                protobuf::createOfferOperationStatus(OFFER_OPERATION_PENDING),
+                frameworkId));
+
+        addOfferOperation(framework, slave, offerOperation);
 
         ApplyOfferOperationMessage message;
         message.mutable_framework_id()->CopyFrom(frameworkId);
         message.mutable_operation_info()->CopyFrom(offerOperation->info());
         message.set_operation_uuid(offerOperation->operation_uuid());
 
-        framework->addOfferOperation(std::move(offerOperation));
-
         send(slave->pid, message);
         break;
       }
@@ -5265,16 +5274,19 @@ void Master::_accept(
                   << operation.destroy_block().block() << " from framework "
                   << *framework << " to agent " << *slave;
 
-        Owned<OfferOperation> offerOperation(new OfferOperation(
-            protobuf::createOfferOperation(operation, frameworkId)));
+        OfferOperation* offerOperation = new OfferOperation(
+            protobuf::createOfferOperation(
+                operation,
+                protobuf::createOfferOperationStatus(OFFER_OPERATION_PENDING),
+                frameworkId));
+
+        addOfferOperation(framework, slave, offerOperation);
 
         ApplyOfferOperationMessage message;
         message.mutable_framework_id()->CopyFrom(frameworkId);
         message.mutable_operation_info()->CopyFrom(offerOperation->info());
         message.set_operation_uuid(offerOperation->operation_uuid());
 
-        framework->addOfferOperation(std::move(offerOperation));
-
         send(slave->pid, message);
         break;
       }
@@ -7263,9 +7275,56 @@ void Master::forward(
 
 
 void Master::offerOperationStatusUpdate(
-    const OfferOperationStatusUpdate& message)
+    const OfferOperationStatusUpdate& update)
 {
-  // TODO(jieyu): Provide implementation here.
+  CHECK(update.has_slave_id())
+    << "External resource provider is not supported yet";
+
+  const SlaveID& slaveId = update.slave_id();
+  const FrameworkID& frameworkId = update.framework_id();
+
+  Try<UUID> uuid = UUID::fromString(update.operation_uuid());
+  if (uuid.isError()) {
+    LOG(ERROR) << "Failed to parse offer operation UUID for operation "
+               << "'" << update.status().operation_id() << "' "
+               << "from framework " << frameworkId << ": " << uuid.error();
+    return;
+  }
+
+  Slave* slave = slaves.registered.get(slaveId);
+
+  // This is possible if the agent is marked as unreachable or gone,
+  // or has initiated a graceful shutdown. In either of those cases,
+  // ignore the offer operation status update.
+  //
+  // TODO(jieyu): If the agent is unreachable or has initiated a
+  // graceful shutdown, we can still forward the update to the
+  // framework so that the framework can get notified about the offer
+  // operation early. However, the acknowledgement of the update won't
+  // be able to reach the agent in those cases. If the agent is gone,
+  // we cannot forward the update because the master might already
+  // tell the framework that the operation is gone.
+  if (slave == nullptr) {
+    LOG(WARNING) << "Ignoring status update for offer operation '"
+                 << update.status().operation_id() << "' (uuid: "
+                 << uuid->toString() << ") for framework "
+                 << frameworkId << " because agent "
+                 << slaveId << " is not registered";
+    return;
+  }
+
+  OfferOperation* operation = slave->getOfferOperation(uuid.get());
+  if (operation == nullptr) {
+    LOG(ERROR) << "Failed to find the offer operation '"
+               << update.status().operation_id() << "' (uuid: "
+               << uuid->toString() << ") from framework "
+               << frameworkId << " on agent " << slaveId;
+    return;
+  }
+
+  updateOfferOperation(operation, update);
+
+  // TODO(jieyu): Forward the status update to the framework.
 }
 
 
@@ -8426,6 +8485,10 @@ void Master::recoverFramework(
         framework->addExecutor(slave->id, executor);
       }
     }
+
+    foreachvalue (OfferOperation* operation, slave->offerOperations) {
+      framework->addOfferOperation(operation);
+    }
   }
 
   addFramework(framework, suppressedRoles);
@@ -9486,6 +9549,27 @@ void Master::removeExecutor(
 }
 
 
+void Master::addOfferOperation(
+    Framework* framework,
+    Slave* slave,
+    OfferOperation* operation)
+{
+  CHECK_NOTNULL(framework);
+  CHECK_NOTNULL(slave);
+  CHECK_NOTNULL(operation);
+
+  slave->addOfferOperation(operation);
+  framework->addOfferOperation(operation);
+}
+
+
+void Master::updateOfferOperation(
+    OfferOperation* operation,
+    OfferOperationStatusUpdate update)
+{
+}
+
+
 Future<Nothing> Master::apply(Slave* slave, const Offer::Operation& operation)
 {
   CHECK_NOTNULL(slave);
@@ -10371,6 +10455,24 @@ void Slave::removeTask(Task* task)
 }
 
 
+void Slave::addOfferOperation(OfferOperation* operation)
+{
+  Try<UUID> uuid = UUID::fromBytes(operation->operation_uuid());
+  CHECK_SOME(uuid);
+
+  offerOperations.put(uuid.get(), operation);
+}
+
+
+OfferOperation* Slave::getOfferOperation(const UUID& uuid) const
+{
+  if (offerOperations.contains(uuid)) {
+    return offerOperations.at(uuid);
+  }
+  return nullptr;
+}
+
+
 void Slave::addOffer(Offer* offer)
 {
   CHECK(!offers.contains(offer)) << "Duplicate offer " << offer->id();

http://git-wip-us.apache.org/repos/asf/mesos/blob/2f1efb99/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 52f5576..4c18258 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -146,6 +146,9 @@ struct Slave
 
   void removeTask(Task* task);
 
+  void addOfferOperation(OfferOperation* operation);
+  OfferOperation* getOfferOperation(const UUID& uuid) const;
+
   void addOffer(Offer* offer);
 
   void removeOffer(Offer* offer);
@@ -228,6 +231,10 @@ struct Slave
   // This is used for reconciliation when the slave re-registers.
   multihashmap<FrameworkID, TaskID> killedTasks;
 
+  // Pending operations or terminal operations that have
+  // unacknowledged status updates on this agent.
+  hashmap<UUID, OfferOperation*> offerOperations;
+
   // Active offers on this slave.
   hashset<Offer*> offers;
 
@@ -475,7 +482,7 @@ public:
       const std::vector<TaskStatus>& statuses);
 
   void offerOperationStatusUpdate(
-      const OfferOperationStatusUpdate& message);
+      const OfferOperationStatusUpdate& update);
 
   void exitedExecutor(
       const process::UPID& from,
@@ -864,6 +871,18 @@ protected:
       const FrameworkID& frameworkId,
       const ExecutorID& executorId);
 
+  // Adds the given offer operation to the framework and the agent.
+  void addOfferOperation(
+      Framework* framework,
+      Slave* slave,
+      OfferOperation* operation);
+
+  // Transitions the offer operation, and recovers resources if the
+  // offer operation becomes terminal.
+  void updateOfferOperation(
+      OfferOperation* operation,
+      OfferOperationStatusUpdate update);
+
   // Attempts to update the allocator by applying the given operation.
   // If successful, updates the slave's resources, sends a
   // 'CheckpointResourcesMessage' to the slave with the updated
@@ -2731,7 +2750,7 @@ struct Framework
     }
   }
 
-  void addOfferOperation(process::Owned<OfferOperation> operation)
+  void addOfferOperation(OfferOperation* operation)
   {
     Try<UUID> uuid = UUID::fromBytes(operation->operation_uuid());
     CHECK_SOME(uuid);
@@ -3000,7 +3019,7 @@ struct Framework
 
   // Pending operations or terminal operations that have
   // unacknowledged status updates.
-  hashmap<UUID, process::Owned<OfferOperation>> offerOperations;
+  hashmap<UUID, OfferOperation*> offerOperations;
 
   // The map from the framework-specified operation ID to the
   // corresponding internal operation UUID.