You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2017/11/06 23:07:00 UTC
[10/14] mesos git commit: Added initial code for offer operation
status update in master.
Added initial code for offer operation status update in master.
Review: https://reviews.apache.org/r/63485
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2f1efb99
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2f1efb99
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2f1efb99
Branch: refs/heads/master
Commit: 2f1efb99acccf3b76eb1bc30704aa794545076ff
Parents: 8f727c9
Author: Jie Yu <yu...@gmail.com>
Authored: Wed Nov 1 23:50:21 2017 +0100
Committer: Jie Yu <yu...@gmail.com>
Committed: Mon Nov 6 14:37:26 2017 -0800
----------------------------------------------------------------------
src/master/master.cpp | 138 +++++++++++++++++++++++++++++++++++++++------
src/master/master.hpp | 25 +++++++-
2 files changed, 142 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/2f1efb99/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 9ac1861..831aaac 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -5145,16 +5145,19 @@ void Master::_accept(
<< operation.create_volume().source() << " from framework "
<< *framework << " to agent " << *slave;
- Owned<OfferOperation> offerOperation(new OfferOperation(
- protobuf::createOfferOperation(operation, frameworkId)));
+ OfferOperation* offerOperation = new OfferOperation(
+ protobuf::createOfferOperation(
+ operation,
+ protobuf::createOfferOperationStatus(OFFER_OPERATION_PENDING),
+ frameworkId));
+
+ addOfferOperation(framework, slave, offerOperation);
ApplyOfferOperationMessage message;
message.mutable_framework_id()->CopyFrom(frameworkId);
message.mutable_operation_info()->CopyFrom(offerOperation->info());
message.set_operation_uuid(offerOperation->operation_uuid());
- framework->addOfferOperation(std::move(offerOperation));
-
send(slave->pid, message);
break;
}
@@ -5185,16 +5188,19 @@ void Master::_accept(
<< operation.destroy_volume().volume() << " from framework "
<< *framework << " to agent " << *slave;
- Owned<OfferOperation> offerOperation(new OfferOperation(
- protobuf::createOfferOperation(operation, frameworkId)));
+ OfferOperation* offerOperation = new OfferOperation(
+ protobuf::createOfferOperation(
+ operation,
+ protobuf::createOfferOperationStatus(OFFER_OPERATION_PENDING),
+ frameworkId));
+
+ addOfferOperation(framework, slave, offerOperation);
ApplyOfferOperationMessage message;
message.mutable_framework_id()->CopyFrom(frameworkId);
message.mutable_operation_info()->CopyFrom(offerOperation->info());
message.set_operation_uuid(offerOperation->operation_uuid());
- framework->addOfferOperation(std::move(offerOperation));
-
send(slave->pid, message);
break;
}
@@ -5225,16 +5231,19 @@ void Master::_accept(
<< operation.create_block().source() << " from framework "
<< *framework << " to agent " << *slave;
- Owned<OfferOperation> offerOperation(new OfferOperation(
- protobuf::createOfferOperation(operation, frameworkId)));
+ OfferOperation* offerOperation = new OfferOperation(
+ protobuf::createOfferOperation(
+ operation,
+ protobuf::createOfferOperationStatus(OFFER_OPERATION_PENDING),
+ frameworkId));
+
+ addOfferOperation(framework, slave, offerOperation);
ApplyOfferOperationMessage message;
message.mutable_framework_id()->CopyFrom(frameworkId);
message.mutable_operation_info()->CopyFrom(offerOperation->info());
message.set_operation_uuid(offerOperation->operation_uuid());
- framework->addOfferOperation(std::move(offerOperation));
-
send(slave->pid, message);
break;
}
@@ -5265,16 +5274,19 @@ void Master::_accept(
<< operation.destroy_block().block() << " from framework "
<< *framework << " to agent " << *slave;
- Owned<OfferOperation> offerOperation(new OfferOperation(
- protobuf::createOfferOperation(operation, frameworkId)));
+ OfferOperation* offerOperation = new OfferOperation(
+ protobuf::createOfferOperation(
+ operation,
+ protobuf::createOfferOperationStatus(OFFER_OPERATION_PENDING),
+ frameworkId));
+
+ addOfferOperation(framework, slave, offerOperation);
ApplyOfferOperationMessage message;
message.mutable_framework_id()->CopyFrom(frameworkId);
message.mutable_operation_info()->CopyFrom(offerOperation->info());
message.set_operation_uuid(offerOperation->operation_uuid());
- framework->addOfferOperation(std::move(offerOperation));
-
send(slave->pid, message);
break;
}
@@ -7263,9 +7275,56 @@ void Master::forward(
void Master::offerOperationStatusUpdate(
- const OfferOperationStatusUpdate& message)
+ const OfferOperationStatusUpdate& update)
{
- // TODO(jieyu): Provide implementation here.
+ CHECK(update.has_slave_id())
+ << "External resource provider is not supported yet";
+
+ const SlaveID& slaveId = update.slave_id();
+ const FrameworkID& frameworkId = update.framework_id();
+
+ Try<UUID> uuid = UUID::fromString(update.operation_uuid());
+ if (uuid.isError()) {
+ LOG(ERROR) << "Failed to parse offer operation UUID for operation "
+ << "'" << update.status().operation_id() << "' "
+ << "from framework " << frameworkId << ": " << uuid.error();
+ return;
+ }
+
+ Slave* slave = slaves.registered.get(slaveId);
+
+ // This is possible if the agent is marked as unreachable or gone,
+ // or has initiated a graceful shutdown. In either of those cases,
+ // ignore the offer operation status update.
+ //
+ // TODO(jieyu): If the agent is unreachable or has initiated a
+ // graceful shutdown, we can still forward the update to the
+ // framework so that the framework can get notified about the offer
+ // operation early. However, the acknowledgement of the update won't
+ // be able to reach the agent in those cases. If the agent is gone,
+ // we cannot forward the update because the master might already
+ // tell the framework that the operation is gone.
+ if (slave == nullptr) {
+ LOG(WARNING) << "Ignoring status update for offer operation '"
+ << update.status().operation_id() << "' (uuid: "
+ << uuid->toString() << ") for framework "
+ << frameworkId << " because agent "
+ << slaveId << " is not registered";
+ return;
+ }
+
+ OfferOperation* operation = slave->getOfferOperation(uuid.get());
+ if (operation == nullptr) {
+ LOG(ERROR) << "Failed to find the offer operation '"
+ << update.status().operation_id() << "' (uuid: "
+ << uuid->toString() << ") from framework "
+ << frameworkId << " on agent " << slaveId;
+ return;
+ }
+
+ updateOfferOperation(operation, update);
+
+ // TODO(jieyu): Forward the status update to the framework.
}
@@ -8426,6 +8485,10 @@ void Master::recoverFramework(
framework->addExecutor(slave->id, executor);
}
}
+
+ foreachvalue (OfferOperation* operation, slave->offerOperations) {
+ framework->addOfferOperation(operation);
+ }
}
addFramework(framework, suppressedRoles);
@@ -9486,6 +9549,27 @@ void Master::removeExecutor(
}
+void Master::addOfferOperation(
+ Framework* framework,
+ Slave* slave,
+ OfferOperation* operation)
+{
+ CHECK_NOTNULL(framework);
+ CHECK_NOTNULL(slave);
+ CHECK_NOTNULL(operation);
+
+ slave->addOfferOperation(operation);
+ framework->addOfferOperation(operation);
+}
+
+
+void Master::updateOfferOperation(
+ OfferOperation* operation,
+ OfferOperationStatusUpdate update)
+{
+}
+
+
Future<Nothing> Master::apply(Slave* slave, const Offer::Operation& operation)
{
CHECK_NOTNULL(slave);
@@ -10371,6 +10455,24 @@ void Slave::removeTask(Task* task)
}
+void Slave::addOfferOperation(OfferOperation* operation)
+{
+ Try<UUID> uuid = UUID::fromBytes(operation->operation_uuid());
+ CHECK_SOME(uuid);
+
+ offerOperations.put(uuid.get(), operation);
+}
+
+
+OfferOperation* Slave::getOfferOperation(const UUID& uuid) const
+{
+ if (offerOperations.contains(uuid)) {
+ return offerOperations.at(uuid);
+ }
+ return nullptr;
+}
+
+
void Slave::addOffer(Offer* offer)
{
CHECK(!offers.contains(offer)) << "Duplicate offer " << offer->id();
http://git-wip-us.apache.org/repos/asf/mesos/blob/2f1efb99/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 52f5576..4c18258 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -146,6 +146,9 @@ struct Slave
void removeTask(Task* task);
+ void addOfferOperation(OfferOperation* operation);
+ OfferOperation* getOfferOperation(const UUID& uuid) const;
+
void addOffer(Offer* offer);
void removeOffer(Offer* offer);
@@ -228,6 +231,10 @@ struct Slave
// This is used for reconciliation when the slave re-registers.
multihashmap<FrameworkID, TaskID> killedTasks;
+ // Pending operations or terminal operations that have
+ // unacknowledged status updates on this agent.
+ hashmap<UUID, OfferOperation*> offerOperations;
+
// Active offers on this slave.
hashset<Offer*> offers;
@@ -475,7 +482,7 @@ public:
const std::vector<TaskStatus>& statuses);
void offerOperationStatusUpdate(
- const OfferOperationStatusUpdate& message);
+ const OfferOperationStatusUpdate& update);
void exitedExecutor(
const process::UPID& from,
@@ -864,6 +871,18 @@ protected:
const FrameworkID& frameworkId,
const ExecutorID& executorId);
+ // Adds the given offer operation to the framework and the agent.
+ void addOfferOperation(
+ Framework* framework,
+ Slave* slave,
+ OfferOperation* operation);
+
+ // Transitions the offer operation, and recovers resources if the
+ // offer operation becomes terminal.
+ void updateOfferOperation(
+ OfferOperation* operation,
+ OfferOperationStatusUpdate update);
+
// Attempts to update the allocator by applying the given operation.
// If successful, updates the slave's resources, sends a
// 'CheckpointResourcesMessage' to the slave with the updated
@@ -2731,7 +2750,7 @@ struct Framework
}
}
- void addOfferOperation(process::Owned<OfferOperation> operation)
+ void addOfferOperation(OfferOperation* operation)
{
Try<UUID> uuid = UUID::fromBytes(operation->operation_uuid());
CHECK_SOME(uuid);
@@ -3000,7 +3019,7 @@ struct Framework
// Pending operations or terminal operations that have
// unacknowledged status updates.
- hashmap<UUID, process::Owned<OfferOperation>> offerOperations;
+ hashmap<UUID, OfferOperation*> offerOperations;
// The map from the framework-specified operation ID to the
// corresponding internal operation UUID.