You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by gr...@apache.org on 2018/03/23 07:56:59 UTC

[04/13] mesos git commit: Implemented the master's `ACKNOWLEDGE_OPERATION_STATUS` handler.

Implemented the master's `ACKNOWLEDGE_OPERATION_STATUS` handler.

Review: https://reviews.apache.org/r/64618/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/89ace2cc
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/89ace2cc
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/89ace2cc

Branch: refs/heads/master
Commit: 89ace2cc35d46a0e3965571b6bf1233b9aa36e13
Parents: 990e4ea
Author: Gaston Kleiman <ga...@mesosphere.io>
Authored: Thu Mar 22 21:52:32 2018 -0700
Committer: Greg Mann <gr...@gmail.com>
Committed: Thu Mar 22 22:00:13 2018 -0700

----------------------------------------------------------------------
 src/master/http.cpp   |   5 +-
 src/master/master.cpp | 114 ++++++++++++++++++++++++++++++++++++++++++++-
 src/master/master.hpp |   2 +-
 3 files changed, 116 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/89ace2cc/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index f84089e..34c9023 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -1101,9 +1101,10 @@ Future<Response> Master::Http::scheduler(
       master->acknowledge(framework, std::move(*call.mutable_acknowledge()));
       return Accepted();
 
-    // TODO(greggomann): Implement operation status acknowledgement.
     case scheduler::Call::ACKNOWLEDGE_OPERATION_STATUS:
-      return Forbidden("Operation status updates are not yet implemented");
+      master->acknowledgeOperationStatus(
+          framework, std::move(*call.mutable_acknowledge_operation_status()));
+      return Accepted();
 
     case scheduler::Call::RECONCILE:
       master->reconcile(framework, std::move(*call.mutable_reconcile()));

http://git-wip-us.apache.org/repos/asf/mesos/blob/89ace2cc/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 6a534c0..d68d6d5 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -6006,10 +6006,120 @@ void Master::acknowledge(
 }
 
 
-// TODO(greggomann): Implement operation status acknowledgement.
 void Master::acknowledgeOperationStatus(
     Framework* framework,
-    const scheduler::Call::AcknowledgeOperationStatus& acknowledge) {}
+    scheduler::Call::AcknowledgeOperationStatus&& acknowledge)
+{
+  CHECK_NOTNULL(framework);
+
+  metrics->messages_operation_status_update_acknowledgement++;
+
+  const OperationID& operationId = acknowledge.operation_id();
+
+  Try<id::UUID> statusUuid_ = id::UUID::fromBytes(acknowledge.uuid());
+
+  CHECK_SOME(statusUuid_);
+  const id::UUID statusUuid = statusUuid_.get();
+
+  CHECK(acknowledge.has_slave_id());
+  const SlaveID& slaveId = acknowledge.slave_id();
+
+  CHECK(acknowledge.has_resource_provider_id());
+
+  Slave* slave = slaves.registered.get(slaveId);
+  if (slave == nullptr) {
+    LOG(WARNING)
+      << "Cannot send operation status update acknowledgement for status "
+      << statusUuid << " of operation '" << operationId << "'"
+      << " of framework " << *framework << " to agent " << slaveId
+      << " because agent is not registered";
+
+    metrics->invalid_operation_status_update_acknowledgements++;
+    return;
+  }
+
+  if (!slave->connected) {
+    LOG(WARNING)
+      << "Cannot send operation status update acknowledgement for status "
+      << statusUuid << " of operation '" << operationId << "'"
+      << " of framework " << *framework << " to agent " << slaveId
+      << " because agent is disconnected";
+
+    metrics->invalid_operation_status_update_acknowledgements++;
+    return;
+  }
+
+  if (!slave->capabilities.resourceProvider) {
+    LOG(WARNING)
+      << "Cannot send operation status update acknowledgement for status "
+      << statusUuid << " of operation '" << operationId << "'"
+      << " of framework " << *framework << " to agent " << slaveId
+      << " because the agent does not support resource providers";
+
+    metrics->invalid_operation_status_update_acknowledgements++;
+    return;
+  }
+
+  const Option<UUID> operationUuid_ =
+    framework->operationUUIDs.get(operationId);
+
+  if (operationUuid_.isNone()) {
+    LOG(WARNING)
+      << "Cannot send operation status update acknowledgement for status "
+      << statusUuid << " of operation '" << operationId << "'"
+      << " of framework" << *framework << " to agent " << slaveId
+      << " because the operation is unknown";
+
+    metrics->invalid_operation_status_update_acknowledgements++;
+    return;
+  }
+  const UUID operationUuid = operationUuid_.get();
+
+  Operation* operation = slave->getOperation(operationUuid);
+  CHECK_NOTNULL(operation);
+
+  auto it = std::find_if(
+      operation->statuses().begin(),
+      operation->statuses().end(),
+      [&statusUuid](const OperationStatus& operationStatus) {
+        return operationStatus.has_uuid() &&
+               operationStatus.uuid().value() == statusUuid.toBytes();
+      });
+
+  if (it == operation->statuses().end()) {
+    LOG(WARNING)
+      << "Ignoring operation status acknowledgement for status " << statusUuid
+      << " of operation '" << operationId << "'"
+      << " (uuid " << operationUuid << ")"
+      << " of framework" << *framework
+      << " because the operation status is unknown";
+
+    metrics->invalid_status_update_acknowledgements++;
+    return;
+  }
+
+  const OperationStatus& acknowledgedStatus = *it;
+
+  LOG(INFO) << "Processing ACKNOWLEDGE_OPERATION_STATUS call for status "
+            << statusUuid << " of operation '" << operationId << "'"
+            << " (uuid " << operationUuid << ")"
+            << " of framework " << *framework << " on agent " << slaveId;
+
+  // If the acknowledged status update is terminal, remove the operation.
+  if (protobuf::isTerminalState(acknowledgedStatus.state())) {
+    removeOperation(operation);
+  }
+
+  AcknowledgeOperationStatusMessage message;
+  message.mutable_status_uuid()->set_value(statusUuid.toBytes());
+  *message.mutable_operation_uuid() = std::move(operationUuid);
+  *message.mutable_resource_provider_id() =
+    std::move(*acknowledge.mutable_resource_provider_id());
+
+  send(slave->pid, message);
+
+  metrics->valid_operation_status_update_acknowledgements++;
+}
 
 
 void Master::schedulerMessage(

http://git-wip-us.apache.org/repos/asf/mesos/blob/89ace2cc/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 7e5d1df..54ead7c 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -1070,7 +1070,7 @@ private:
 
   void acknowledgeOperationStatus(
       Framework* framework,
-      const scheduler::Call::AcknowledgeOperationStatus& acknowledge);
+      scheduler::Call::AcknowledgeOperationStatus&& acknowledge);
 
   void reconcile(
       Framework* framework,