You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2017/10/29 15:08:38 UTC

[1/7] mesos git commit: Disallowed combining resource providers and CheckpointResourcesMessage.

Repository: mesos
Updated Branches:
  refs/heads/master 882e1dce8 -> 75cad1213


Disallowed combining resource providers and CheckpointResourcesMessage.

Offer operations on resource provider resources can require
asynchronous handling since they can in principal take a long time to
complete. Additionally, they can fail even after passing validation in
the master, e.g., due to outside changes to the affected resources.
For these reasons, resource provider resources require an offer
operation protocol allowing failures outside of the master and
communicating these failures to the master.

Since this feedback can only be provided asynchronously, resource
provider resources are incompatible with `CheckpointResourcesMessage`
which by design updates the agent with the master's view of the
agent's resources, and does not account for asynchronous changes to
the agent's resources (leading e.g., to incompatible state between
master and agents).

This patch makes sure that agents with resource providers do not use
the 'CheckpointResourcesMessage' protocol. This prevents users from
running resource provider agents against legacy masters.

Review: https://reviews.apache.org/r/62974/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/75cad121
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/75cad121
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/75cad121

Branch: refs/heads/master
Commit: 75cad1213e218a7f114c46c3d7c92047dae80345
Parents: 33d1ff1
Author: Benjamin Bannier <bb...@apache.org>
Authored: Fri Oct 13 21:05:27 2017 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Sun Oct 29 15:57:28 2017 +0100

----------------------------------------------------------------------
 src/slave/slave.cpp | 12 ++++++++++++
 1 file changed, 12 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/75cad121/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index d8477b4..8e4785f 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -3438,6 +3438,18 @@ void Slave::checkpointResources(vector<Resource> _checkpointedResources)
   //      happens, we expect framework to reconcile based on the
   //      offers they get.
 
+  // An agent with resource providers requires an offer operation feedback
+  // protocol instead of simply checkpointing results by the master. Fail hard
+  // here instead of applying an incompatible message.
+  const bool hasResourceProviders = std::any_of(
+      totalResources.begin(),
+      totalResources.end(),
+      [](const Resource& resource) { return resource.has_provider_id(); });
+
+  CHECK(!hasResourceProviders)
+    << "Master protocol for offer operations is incompatible with agent with "
+       "resource providers";
+
   convertResourceFormat(&_checkpointedResources, POST_RESERVATION_REFINEMENT);
 
   Resources newCheckpointedResources = _checkpointedResources;


[5/7] mesos git commit: Added resource version uuid for offer operations.

Posted by ji...@apache.org.
Added resource version uuid for offer operations.

The resource version UUID is used to establish the relationship
between the operation and the resources that the operation is
operating on. Each resource provider will keep a resource version
UUID, and change it when it believes that the resources from this
resource provider are out of sync from the master's view.  The master
will keep track of the last known resource version UUID for each
resource provider, and attach the resource version UUID in each
operation it sends out. The resource provider should reject operations
that have a different resource version UUID than that it maintains,
because this means the operation is operating on resources that might
have already been invalidated.

Review: https://reviews.apache.org/r/63094


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6b9d0c54
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6b9d0c54
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6b9d0c54

Branch: refs/heads/master
Commit: 6b9d0c5426fa7c0a827b8e795264b4ddc9a80aa9
Parents: 9490730
Author: Jie Yu <yu...@gmail.com>
Authored: Tue Oct 17 16:02:24 2017 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Sun Oct 29 15:57:28 2017 +0100

----------------------------------------------------------------------
 .../resource_provider/resource_provider.proto   | 28 ++++++++++++++
 .../resource_provider/resource_provider.proto   | 28 ++++++++++++++
 src/messages/messages.proto                     | 39 ++++++++++++++++++++
 src/tests/resource_provider_manager_tests.cpp   |  1 +
 .../resource_provider_validation_tests.cpp      |  3 +-
 5 files changed, 98 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/6b9d0c54/include/mesos/resource_provider/resource_provider.proto
----------------------------------------------------------------------
diff --git a/include/mesos/resource_provider/resource_provider.proto b/include/mesos/resource_provider/resource_provider.proto
index 1b26f89..1e44b95 100644
--- a/include/mesos/resource_provider/resource_provider.proto
+++ b/include/mesos/resource_provider/resource_provider.proto
@@ -52,6 +52,20 @@ message Event {
     // independently from the framework-specified operation ID, which
     // is optional.
     required bytes operation_uuid = 3;
+
+    // Used to establish the relationship between the operation and
+    // the resources that the operation is operating on. Each resource
+    // provider will keep a resource version UUID, and change it when
+    // it believes that the resources from this resource provider are
+    // out of sync from the master's view. The master will keep track
+    // of the last known resource version UUID for each resource
+    // provider, and attach the resource version UUID in each
+    // operation it sends out. The resource provider should reject
+    // operations that have a different resource version UUID than
+    // that it maintains, because this means the operation is
+    // operating on resources that might have already been
+    // invalidated.
+    required bytes resource_version_uuid = 4;
   }
 
   optional Type type = 1;
@@ -98,6 +112,20 @@ message Call {
 
     // The total resources provided by this resource provider.
     repeated Resource resources = 2;
+
+    // Used to establish the relationship between the operation and
+    // the resources that the operation is operating on. Each resource
+    // provider will keep a resource version UUID, and change it when
+    // it believes that the resources from this resource provider are
+    // out of sync from the master's view. The master will keep track
+    // of the last known resource version UUID for each resource
+    // provider, and attach the resource version UUID in each
+    // operation it sends out. The resource provider should reject
+    // operations that have a different resource version UUID than
+    // that it maintains, because this means the operation is
+    // operating on resources that might have already been
+    // invalidated.
+    required bytes resource_version_uuid = 3;
   }
 
   // Identifies who generated this call.

http://git-wip-us.apache.org/repos/asf/mesos/blob/6b9d0c54/include/mesos/v1/resource_provider/resource_provider.proto
----------------------------------------------------------------------
diff --git a/include/mesos/v1/resource_provider/resource_provider.proto b/include/mesos/v1/resource_provider/resource_provider.proto
index 3afdbb9..3956f82 100644
--- a/include/mesos/v1/resource_provider/resource_provider.proto
+++ b/include/mesos/v1/resource_provider/resource_provider.proto
@@ -52,6 +52,20 @@ message Event {
     // independently from the framework specified operation id, which is
     // optional.
     required bytes operation_uuid = 3;
+
+    // Used to establish the relationship between the operation and
+    // the resources that the operation is operating on. Each resource
+    // provider will keep a resource version UUID, and change it when
+    // it believes that the resources from this resource provider are
+    // out of sync from the master's view. The master will keep track
+    // of the last known resource version UUID for each resource
+    // provider, and attach the resource version UUID in each
+    // operation it sends out. The resource provider should reject
+    // operations that have a different resource version UUID than
+    // that it maintains, because this means the operation is
+    // operating on resources that might have already been
+    // invalidated.
+    required bytes resource_version_uuid = 4;
   }
 
   optional Type type = 1;
@@ -98,6 +112,20 @@ message Call {
 
     // The total resources provided by this resource provider.
     repeated Resource resources = 2;
+
+    // Used to establish the relationship between the operation and
+    // the resources that the operation is operating on. Each resource
+    // provider will keep a resource version UUID, and change it when
+    // it believes that the resources from this resource provider are
+    // out of sync from the master's view. The master will keep track
+    // of the last known resource version UUID for each resource
+    // provider, and attach the resource version UUID in each
+    // operation it sends out. The resource provider should reject
+    // operations that have a different resource version UUID than
+    // that it maintains, because this means the operation is
+    // operating on resources that might have already been
+    // invalidated.
+    required bytes resource_version_uuid = 3;
   }
 
   // Identifies who generated this call.

http://git-wip-us.apache.org/repos/asf/mesos/blob/6b9d0c54/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index 063a7c0..9dfe7f0 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -605,6 +605,19 @@ message CheckpointResourcesMessage {
 
 
 /**
+ * Describes a resource version using a UUID. It is used to establish
+ * the relationship between an offer operation and the resources that
+ * offer operation is operating on.
+ */
+message ResourceVersionUUID {
+  // If not set, it represents resources directly from the agent (not
+  // having a backing resource provider).
+  optional ResourceProviderID resource_provider_id = 1;
+  required bytes uuid = 2;
+}
+
+
+/**
  * This message is sent by the agent to the master to inform the
  * master about the total amount of oversubscribed (allocated and
  * allocatable), or total resources. For `RESOURCE_PROVIDER` capable
@@ -639,6 +652,19 @@ message UpdateSlaveMessage {
   // Pending operations or terminal operations that have
   // unacknowledged status updates, which are known to this agent.
   optional OfferOperations offer_operations = 6;
+
+  // Used to establish the relationship between the operation and the
+  // resources that the operation is operating on. Each resource
+  // provider will keep a resource version UUID, and change it when
+  // it believes that the resources from this resource provider are
+  // out of sync from the master's view.  The master will keep track
+  // of the last known resource version UUID for each resource
+  // provider, and attach the resource version UUID in each operation
+  // it sends out. The resource provider should reject operations that
+  // have a different resource version UUID than that it maintains,
+  // because this means the operation is operating on resources that
+  // might have already been invalidated.
+  repeated ResourceVersionUUID resource_version_uuids = 7;
 }
 
 
@@ -675,6 +701,19 @@ message ApplyOfferOperationMessage {
   // independently from the framework-specified operation ID, which is
   // optional.
   required bytes operation_uuid = 3;
+
+  // Used to establish the relationship between the operation and the
+  // resources that the operation is operating on. Each resource
+  // provider will keep a resource version UUID, and change it when
+  // it believes that the resources from this resource provider are
+  // out of sync from the master's view.  The master will keep track
+  // of the last known resource version UUID for each resource
+  // provider, and attach the resource version UUID in each operation
+  // it sends out. The resource provider should reject operations that
+  // have a different resource version UUID than that it maintains,
+  // because this means the operation is operating on resources that
+  // might have already been invalidated.
+  required ResourceVersionUUID resource_version_uuid = 4;
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/6b9d0c54/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index f0fd5e8..4008b1c 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -286,6 +286,7 @@ TEST_P(ResourceProviderManagerHttpApiTest, UpdateState)
     Call::UpdateState* updateState = call.mutable_update_state();
 
     updateState->mutable_resources()->CopyFrom(v1::Resources(resources));
+    updateState->set_resource_version_uuid(UUID::random().toBytes());
 
     http::Request request;
     request.method = "POST";

http://git-wip-us.apache.org/repos/asf/mesos/blob/6b9d0c54/src/tests/resource_provider_validation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_validation_tests.cpp b/src/tests/resource_provider_validation_tests.cpp
index e6960b1..bf789a0 100644
--- a/src/tests/resource_provider_validation_tests.cpp
+++ b/src/tests/resource_provider_validation_tests.cpp
@@ -101,7 +101,8 @@ TEST(ResourceProviderCallValidationTest, UpdateState)
   error = call::validate(call);
   EXPECT_SOME(error);
 
-  call.mutable_update_state();
+  Call::UpdateState* updateState = call.mutable_update_state();
+  updateState->set_resource_version_uuid(UUID::random().toBytes());
 
   error = call::validate(call);
   EXPECT_NONE(error);


[7/7] mesos git commit: Added RESOURCE_PROVIDER agent capability.

Posted by ji...@apache.org.
Added RESOURCE_PROVIDER agent capability.

Review: https://reviews.apache.org/r/62877


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/5b83b31c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/5b83b31c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/5b83b31c

Branch: refs/heads/master
Commit: 5b83b31c91fa35ff55a2d29a05eb526330343b44
Parents: 6b9d0c5
Author: Jie Yu <yu...@gmail.com>
Authored: Tue Oct 10 16:09:30 2017 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Sun Oct 29 15:57:28 2017 +0100

----------------------------------------------------------------------
 include/mesos/mesos.proto     | 9 +++++++++
 include/mesos/v1/mesos.proto  | 9 +++++++++
 src/common/protobuf_utils.cpp | 3 ++-
 src/common/protobuf_utils.hpp | 7 +++++++
 4 files changed, 27 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/5b83b31c/include/mesos/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index ca0f395..12652fe 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -965,6 +965,15 @@ message SlaveInfo {
       // NOTE: Resources are said to have refined reservations if it uses the
       // `Resource.reservations` field, and `Resource.reservations_size() > 1`.
       RESERVATION_REFINEMENT = 3;
+
+      // This expresses the ability for the agent to handle resource
+      // provider related operations. This includes the following:
+      //
+      // (1) The ability to report resources that are provided by some
+      //     local resource providers through the resource provider API.
+      //
+      // (2) The ability to provide offer operations feedback.
+      RESOURCE_PROVIDER = 4;
     }
 
     // Enum fields should be optional, see: MESOS-4997.

http://git-wip-us.apache.org/repos/asf/mesos/blob/5b83b31c/include/mesos/v1/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/v1/mesos.proto b/include/mesos/v1/mesos.proto
index c97b180..e0797ae 100644
--- a/include/mesos/v1/mesos.proto
+++ b/include/mesos/v1/mesos.proto
@@ -957,6 +957,15 @@ message AgentInfo {
       // NOTE: Resources are said to have refined reservations if it uses the
       // `Resource.reservations` field, and `Resource.reservations_size() > 1`.
       RESERVATION_REFINEMENT = 3;
+
+      // This expresses the ability for the agent to handle resource
+      // provider related operations. This includes the following:
+      //
+      // (1) The ability to report resources that are provided by some
+      //     local resource providers through the resource provider API.
+      //
+      // (2) The ability to provide offer operations feedback.
+      RESOURCE_PROVIDER = 4;
     }
 
     // Enum fields should be optional, see: MESOS-4997.

http://git-wip-us.apache.org/repos/asf/mesos/blob/5b83b31c/src/common/protobuf_utils.cpp
----------------------------------------------------------------------
diff --git a/src/common/protobuf_utils.cpp b/src/common/protobuf_utils.cpp
index fd4858a..cbb7947 100644
--- a/src/common/protobuf_utils.cpp
+++ b/src/common/protobuf_utils.cpp
@@ -751,7 +751,8 @@ bool operator==(const Capabilities& left, const Capabilities& right)
   // equality.
   return left.multiRole == right.multiRole &&
          left.hierarchicalRole == right.hierarchicalRole &&
-         left.reservationRefinement == right.reservationRefinement;
+         left.reservationRefinement == right.reservationRefinement &&
+         left.resourceProvider == right.resourceProvider;
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/5b83b31c/src/common/protobuf_utils.hpp
----------------------------------------------------------------------
diff --git a/src/common/protobuf_utils.hpp b/src/common/protobuf_utils.hpp
index c43ab75..d5fef9c 100644
--- a/src/common/protobuf_utils.hpp
+++ b/src/common/protobuf_utils.hpp
@@ -208,6 +208,9 @@ struct Capabilities
         case SlaveInfo::Capability::RESERVATION_REFINEMENT:
           reservationRefinement = true;
           break;
+        case SlaveInfo::Capability::RESOURCE_PROVIDER:
+          resourceProvider = true;
+          break;
         // If adding another case here be sure to update the
         // equality operator.
       }
@@ -218,6 +221,7 @@ struct Capabilities
   bool multiRole = false;
   bool hierarchicalRole = false;
   bool reservationRefinement = false;
+  bool resourceProvider = false;
 
   google::protobuf::RepeatedPtrField<SlaveInfo::Capability>
   toRepeatedPtrField() const
@@ -232,6 +236,9 @@ struct Capabilities
     if (reservationRefinement) {
       result.Add()->set_type(SlaveInfo::Capability::RESERVATION_REFINEMENT);
     }
+    if (resourceProvider) {
+      result.Add()->set_type(SlaveInfo::Capability::RESOURCE_PROVIDER);
+    }
 
     return result;
   }


[4/7] mesos git commit: Updated protobuf definitions related to offer operations.

Posted by ji...@apache.org.
Updated protobuf definitions related to offer operations.

Review: https://reviews.apache.org/r/63001


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d6ac32dc
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d6ac32dc
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d6ac32dc

Branch: refs/heads/master
Commit: d6ac32dc9c9013f24a9e41d3d32b7421c333f1e1
Parents: 882e1dc
Author: Jie Yu <yu...@gmail.com>
Authored: Sat Oct 14 21:28:36 2017 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Sun Oct 29 15:57:28 2017 +0100

----------------------------------------------------------------------
 include/mesos/mesos.proto                       | 72 ++++++++++++++++++++
 .../resource_provider/resource_provider.proto   | 46 +++++++------
 include/mesos/v1/mesos.proto                    | 72 ++++++++++++++++++++
 .../resource_provider/resource_provider.proto   | 44 ++++++------
 src/messages/messages.proto                     | 48 ++++++++++++-
 src/resource_provider/manager.cpp               | 17 +++--
 src/resource_provider/validation.cpp            |  6 +-
 .../resource_provider_validation_tests.cpp      | 20 ++++--
 8 files changed, 268 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/d6ac32dc/include/mesos/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index 859fdff..ca0f395 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -107,6 +107,15 @@ message ResourceProviderID {
 
 
 /**
+ * A framework-generated ID to distinguish an offer operation. The ID
+ * must be unique within the framework.
+ */
+message OfferOperationID {
+  required string value = 1;
+}
+
+
+/**
  * Represents time since the epoch, in nanoseconds.
  */
 message TimeInfo {
@@ -1869,6 +1878,7 @@ message Offer {
     }
 
     optional Type type = 1;
+    optional OfferOperationID id = 12;
     optional Launch launch = 2;
     optional LaunchGroup launch_group = 7;
     optional Reserve reserve = 3;
@@ -2137,6 +2147,68 @@ message TaskResourceLimitation {
 
 
 /**
+ * Describes an offer operation, similar to `Offer.Operation`, with
+ * some additional information.
+ */
+message OfferOperation {
+  required FrameworkID framework_id = 1;
+  required Offer.Operation info = 2;
+
+  // All the statuses known to this offer operation. Some of the
+  // statuses in this list might not have been acknowledged yet. The
+  // statuses are ordered.
+  repeated OfferOperationStatus statuses = 3;
+
+  // This is the internal UUID for the operation, which is kept
+  // independently from the framework-specified operation ID, which is
+  // optional.
+  required bytes operation_uuid = 4;
+}
+
+
+/**
+ * Describes possible offer operation states.
+ */
+enum OfferOperationState {
+  // Default value if the enum is not set. See MESOS-4997.
+  OFFER_OPERATION_UNSUPPORTED = 0;
+
+  // Initial state.
+  OFFER_OPERATION_PENDING = 1;
+
+  // The operation was successfully applied.
+  // This state is terminal.
+  OFFER_OPERATION_FINISHED = 2;
+
+  // The operation failed to apply.
+  // This state is terminal.
+  OFFER_OPERATION_FAILED = 3;
+
+  // The operation description contains an error.
+  OFFER_OPERATION_ERROR = 4;
+}
+
+
+/**
+ * Describes the current status of an offer operation.
+ */
+message OfferOperationStatus {
+  optional OfferOperationID operation_id = 1;
+  required OfferOperationState state = 2;
+  optional string message = 3;
+
+  // Converted resources after applying the operation. This only
+  // applies if the `state` is `OFFER_OPERATION_FINISHED`.
+  repeated Resource converted_resources = 4;
+
+  // Statuses that are delivered reliably to the scheduler will
+  // include a `status_uuid`. The status is considered delivered once
+  // it is acknowledged by the scheduler.
+  optional bytes status_uuid = 5;
+}
+
+
+/**
 * Describes the status of a check. Type and the corresponding field, i.e.,
 * `command` or `http` must be set. If the result of the check is not available
 * (e.g., the check timed out), these fields must contain empty messages, i.e.,

http://git-wip-us.apache.org/repos/asf/mesos/blob/d6ac32dc/include/mesos/resource_provider/resource_provider.proto
----------------------------------------------------------------------
diff --git a/include/mesos/resource_provider/resource_provider.proto b/include/mesos/resource_provider/resource_provider.proto
index f5a9073..ef67aa1 100644
--- a/include/mesos/resource_provider/resource_provider.proto
+++ b/include/mesos/resource_provider/resource_provider.proto
@@ -45,7 +45,13 @@ message Event {
   // Received when the master wants to send an operation to the
   // resource provider.
   message Operation {
-    optional Offer.Operation operation = 1;
+    required FrameworkID framework_id = 1;
+    required Offer.Operation info = 2;
+
+    // This is the internal UUID for the operation, which is kept
+    // independently from the framework-specified operation ID, which
+    // is optional.
+    required bytes operation_uuid = 3;
   }
 
   optional Type type = 1;
@@ -62,8 +68,8 @@ message Call {
     // in a backwards-compatible way. See: MESOS-4997.
     UNKNOWN = 0;
 
-    SUBSCRIBE = 1;    // See 'Subscribe' below.
-    UPDATE = 2;       // See 'Update' below.
+    SUBSCRIBE = 1;                     // See 'Subscribe'.
+    UPDATE_OFFER_OPERATION_STATUS = 2; // See 'UpdateOfferOperationStatus'.
   }
 
   // Request to subscribe with the master.
@@ -74,25 +80,25 @@ message Call {
   // and then updates its resources.
   message Subscribe {
     required ResourceProviderInfo resource_provider_info = 1;
+
+    // This includes pending operations and those that have
+    // unacknowledged statuses.
+    repeated OfferOperation operations = 3;
+
+    // The total resources provided by this resource provider.
     repeated Resource resources = 2;
   }
 
-  // Notify the master about the status of an operation.
-  message Update {
-    enum State {
-      // This must be the first enum value in this list, to
-      // ensure that if 'state' is not set, the default value
-      // is UNKNOWN. This enables enum values to be added
-      // in a backwards-compatible way. See: MESOS-4997.
-      UNKNOWN = 0;
-
-      OK = 1;
-      FAILED = 2;
-    }
-
-    required State state = 1;
-    required Offer.Operation operation = 2;
-    repeated Resource converted_resources = 3;
+  // Notify the master about the status of an offer operation.
+  message UpdateOfferOperationStatus {
+    required FrameworkID framework_id = 1;
+    required OfferOperationStatus status = 2;
+    optional OfferOperationStatus latest_status = 3;
+
+    // This is the internal UUID for the operation, which is kept
+    // independently from the framework-specified operation ID, which
+    // is optional.
+    required bytes operation_uuid = 4;
   }
 
   // Identifies who generated this call.
@@ -107,5 +113,5 @@ message Call {
 
   optional Type type = 2;
   optional Subscribe subscribe = 3;
-  optional Update update = 4;
+  optional UpdateOfferOperationStatus update_offer_operation_status = 4;
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/d6ac32dc/include/mesos/v1/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/v1/mesos.proto b/include/mesos/v1/mesos.proto
index cfd4abd..c97b180 100644
--- a/include/mesos/v1/mesos.proto
+++ b/include/mesos/v1/mesos.proto
@@ -105,6 +105,15 @@ message ResourceProviderID {
 
 
 /**
+ * A framework-generated ID to distinguish an offer operation. The ID
+ * must be unique within the framework.
+ */
+message OfferOperationID {
+  required string value = 1;
+}
+
+
+/**
  * Represents time since the epoch, in nanoseconds.
  */
 message TimeInfo {
@@ -1850,6 +1859,7 @@ message Offer {
     }
 
     optional Type type = 1;
+    optional OfferOperationID id = 12;
     optional Launch launch = 2;
     optional LaunchGroup launch_group = 7;
     optional Reserve reserve = 3;
@@ -2118,6 +2128,68 @@ message TaskResourceLimitation {
 
 
 /**
+ * Describes an offer operation, similar to `Offer.Operation`, with
+ * some additional information.
+ */
+message OfferOperation {
+  required FrameworkID framework_id = 1;
+  required Offer.Operation info = 2;
+
+  // All the statuses known to this offer operation. Some of the
+  // statuses in this list might not have been acknowledged yet. The
+  // statuses are ordered.
+  repeated OfferOperationStatus statuses = 3;
+
+  // This is the internal UUID for the operation, which is kept
+  // independently from the framework-specified operation ID, which is
+  // optional.
+  required bytes operation_uuid = 4;
+}
+
+
+/**
+ * Describes possible offer operation states.
+ */
+enum OfferOperationState {
+  // Default value if the enum is not set. See MESOS-4997.
+  OFFER_OPERATION_UNSUPPORTED = 0;
+
+  // Initial state.
+  OFFER_OPERATION_PENDING = 1;
+
+  // The operation was successfully applied.
+  // This state is terminal.
+  OFFER_OPERATION_FINISHED = 2;
+
+  // The operation failed to apply.
+  // This state is terminal.
+  OFFER_OPERATION_FAILED = 3;
+
+  // The operation description contains an error.
+  OFFER_OPERATION_ERROR = 4;
+}
+
+
+/**
+ * Describes the current status of an offer operation.
+ */
+message OfferOperationStatus {
+  optional OfferOperationID operation_id = 1;
+  required OfferOperationState state = 2;
+  optional string message = 3;
+
+  // Converted resources after applying the operation. This only
+  // applies if the `state` is `OFFER_OPERATION_FINISHED`.
+  repeated Resource converted_resources = 4;
+
+  // Statuses that are delivered reliably to the scheduler will
+  // include a `status_uuid`. The status is considered delivered once
+  // it is acknowledged by the scheduler.
+  optional bytes status_uuid = 5;
+}
+
+
+/**
 * Describes the status of a check. Type and the corresponding field, i.e.,
 * `command` or `http` must be set. If the result of the check is not available
 * (e.g., the check timed out), these fields must contain empty messages, i.e.,

http://git-wip-us.apache.org/repos/asf/mesos/blob/d6ac32dc/include/mesos/v1/resource_provider/resource_provider.proto
----------------------------------------------------------------------
diff --git a/include/mesos/v1/resource_provider/resource_provider.proto b/include/mesos/v1/resource_provider/resource_provider.proto
index e5cbede..10b77c7 100644
--- a/include/mesos/v1/resource_provider/resource_provider.proto
+++ b/include/mesos/v1/resource_provider/resource_provider.proto
@@ -45,7 +45,13 @@ message Event {
   // Received when the master wants to send an operation to the
   // resource provider.
   message Operation {
-    optional Offer.Operation operation = 1;
+    required FrameworkID framework_id = 1;
+    required Offer.Operation info = 2;
+
+    // This is the internal UUID for the operation, which is kept
+    // independently from the framework specified operation id, which is
+    // optional.
+    required bytes operation_uuid = 3;
   }
 
   optional Type type = 1;
@@ -62,32 +68,32 @@ message Call {
     // in a backwards-compatible way. See: MESOS-4997.
     UNKNOWN = 0;
 
-    SUBSCRIBE = 1;    // See 'Subscribe' below.
-    UPDATE = 2;       // See 'Update' below.
+    SUBSCRIBE = 1;                     // See 'Subscribe'.
+    UPDATE_OFFER_OPERATION_STATUS = 2; // See 'UpdateOfferOperationStatus'.
   }
 
   // Request to subscribe with the master.
   message Subscribe {
     required ResourceProviderInfo resource_provider_info = 1;
+
+    // This includes pending operations and those that have
+    // unacknowledged statuses.
+    repeated OfferOperation operations = 3;
+
+    // The total resources provided by this resource provider.
     repeated Resource resources = 2;
   }
 
   // Notify the master about the status of an operation.
-  message Update {
-    enum State {
-      // This must be the first enum value in this list, to
-      // ensure that if 'state' is not set, the default value
-      // is UNKNOWN. This enables enum values to be added
-      // in a backwards-compatible way. See: MESOS-4997.
-      UNKNOWN = 0;
-
-      OK = 1;
-      FAILED = 2;
-    }
-
-    required State state = 1;
-    required Offer.Operation operation = 2;
-    repeated Resource converted_resources = 3;
+  message UpdateOfferOperationStatus {
+    required FrameworkID framework_id = 1;
+    required OfferOperationStatus status = 2;
+    optional OfferOperationStatus latest_status = 3;
+
+    // This is the internal UUID for the operation, which is kept
+    // independently from the framework specified operation id, which is
+    // optional.
+    required bytes operation_uuid = 4;
   }
 
   // Identifies who generated this call.
@@ -102,5 +108,5 @@ message Call {
 
   optional Type type = 2;
   optional Subscribe subscribe = 3;
-  optional Update update = 4;
+  optional UpdateOfferOperationStatus update_offer_operation_status = 4;
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/d6ac32dc/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index 0a32b34..063a7c0 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -607,7 +607,9 @@ message CheckpointResourcesMessage {
 /**
  * This message is sent by the agent to the master to inform the
  * master about the total amount of oversubscribed (allocated and
- * allocatable), or total resources.
+ * allocatable), or total resources. For `RESOURCE_PROVIDER` capable
+ * agents, this message also includes the offer operations that are
+ * either pending, or terminal but have unacknowledged status updates.
  */
 message UpdateSlaveMessage {
   required SlaveID slave_id = 1;
@@ -629,6 +631,50 @@ message UpdateSlaveMessage {
 
   repeated Resource oversubscribed_resources = 2;
   repeated Resource total_resources = 4;
+
+  message OfferOperations {
+    repeated OfferOperation operations = 1;
+  }
+
+  // Pending operations or terminal operations that have
+  // unacknowledged status updates, which are known to this agent.
+  optional OfferOperations offer_operations = 6;
+}
+
+
+/**
+ * This message is sent from the agent to the master to update the
+ * status of an offer operation.
+ *
+ * See resource_provider::Call::UPDATE_OFFER_OPERATION_STATUS.
+ */
+message OfferOperationStatusUpdate {
+  required FrameworkID framework_id = 1;
+  required SlaveID slave_id = 2;
+  required OfferOperationStatus status = 3;
+  optional OfferOperationStatus latest_status = 4;
+
+  // This is the internal UUID for the operation, which is kept
+  // independently from the framework-specified operation ID, which is
+  // optional.
+  required bytes operation_uuid = 5;
+}
+
+
+/**
+ * This message is sent from the master to the agent to apply an offer
+ * operation.
+ *
+ * See resource_provider::Event::OPERATION.
+ */
+message ApplyOfferOperationMessage {
+  required FrameworkID framework_id = 1;
+  required Offer.Operation operation_info = 2;
+
+  // This is the internal UUID for the operation, which is kept
+  // independently from the framework-specified operation ID, which is
+  // optional.
+  required bytes operation_uuid = 3;
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/d6ac32dc/src/resource_provider/manager.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp
index 31fcb78..4713a2a 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -151,9 +151,9 @@ private:
       const HttpConnection& http,
       const Call::Subscribe& subscribe);
 
-  void update(
+  void updateOfferOperationStatus(
       ResourceProvider* resourceProvider,
-      const Call::Update& update);
+      const Call::UpdateOfferOperationStatus& update);
 
   ResourceProviderID newResourceProviderId();
 };
@@ -282,8 +282,11 @@ Future<http::Response> ResourceProviderManagerProcess::api(
       LOG(FATAL) << "Unexpected 'SUBSCRIBE' call";
     }
 
-    case Call::UPDATE: {
-      update(&resourceProvider, call.update());
+    case Call::UPDATE_OFFER_OPERATION_STATUS: {
+      updateOfferOperationStatus(
+          &resourceProvider,
+          call.update_offer_operation_status());
+
       return Accepted();
     }
   }
@@ -335,11 +338,11 @@ void ResourceProviderManagerProcess::subscribe(
 }
 
 
-void ResourceProviderManagerProcess::update(
+void ResourceProviderManagerProcess::updateOfferOperationStatus(
     ResourceProvider* resourceProvider,
-    const Call::Update& update)
+    const Call::UpdateOfferOperationStatus& update)
 {
-  // TODO(nfnt): Implement the 'UPDATE' call handler.
+  // TODO(nfnt): Implement the 'UPDATE_OFFER_OPERATION_STATUS' call handler.
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/d6ac32dc/src/resource_provider/validation.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/validation.cpp b/src/resource_provider/validation.cpp
index d292722..d8a9fb0 100644
--- a/src/resource_provider/validation.cpp
+++ b/src/resource_provider/validation.cpp
@@ -50,13 +50,13 @@ Option<Error> validate(const Call& call)
       return None();
     }
 
-    case Call::UPDATE: {
+    case Call::UPDATE_OFFER_OPERATION_STATUS: {
       if (!call.has_resource_provider_id()) {
         return Error("Expecting 'resource_provider_id' to be present");
       }
 
-      if (!call.has_update()) {
-        return Error("Expecting 'update' to be present");
+      if (!call.has_update_offer_operation_status()) {
+        return Error("Expecting 'update_offer_operation_status' to be present");
       }
 
       return None();

http://git-wip-us.apache.org/repos/asf/mesos/blob/d6ac32dc/src/tests/resource_provider_validation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_validation_tests.cpp b/src/tests/resource_provider_validation_tests.cpp
index f182bff..862e844 100644
--- a/src/tests/resource_provider_validation_tests.cpp
+++ b/src/tests/resource_provider_validation_tests.cpp
@@ -54,25 +54,31 @@ TEST(ResourceProviderCallValidationTest, Subscribe)
 }
 
 
-TEST(ResourceProviderCallValidationTest, Update)
+TEST(ResourceProviderCallValidationTest, UpdateOfferOperationStatus)
 {
   Call call;
-  call.set_type(Call::UPDATE);
+  call.set_type(Call::UPDATE_OFFER_OPERATION_STATUS);
 
-  // Expecting a resource provider ID and `Call::Update`.
+  // Expecting a resource provider ID and `Call::UpdateOfferOperationStatus`.
   Option<Error> error = call::validate(call);
   EXPECT_SOME(error);
 
   ResourceProviderID* id = call.mutable_resource_provider_id();
   id->set_value(UUID::random().toString());
 
-  // Still expecting `Call::Update`.
+  // Still expecting `Call::UpdateOfferOperationStatus`.
   error = call::validate(call);
   EXPECT_SOME(error);
 
-  Call::Update* update = call.mutable_update();
-  update->set_state(Call::Update::OK);
-  update->mutable_operation();
+  Call::UpdateOfferOperationStatus* update =
+    call.mutable_update_offer_operation_status();
+
+  update->mutable_framework_id()->set_value(UUID::random().toString());
+  update->set_operation_uuid(UUID::random().toBytes());
+
+  OfferOperationStatus* status = update->mutable_status();
+  status->mutable_operation_id()->set_value(UUID::random().toString());
+  status->set_state(OFFER_OPERATION_FINISHED);
 
   error = call::validate(call);
   EXPECT_NONE(error);


[2/7] mesos git commit: Sent CheckpointResourcesMessage only when reregister with an old master.

Posted by ji...@apache.org.
Sent CheckpointResourcesMessage only when reregister with an old master.

No need for sending checkpoint resources message to the agent if the
master does not have state about the agent.

Review: https://reviews.apache.org/r/62878


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e9ac9f82
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e9ac9f82
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e9ac9f82

Branch: refs/heads/master
Commit: e9ac9f8252a1aa01d72fb7b918bf723df8e7dd7b
Parents: 5b83b31
Author: Jie Yu <yu...@gmail.com>
Authored: Tue Oct 10 17:43:28 2017 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Sun Oct 29 15:57:28 2017 +0100

----------------------------------------------------------------------
 src/master/master.cpp | 79 ++++++++++++++++++++++------------------------
 1 file changed, 38 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/e9ac9f82/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 4b76648..c118b9d 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -6313,11 +6313,47 @@ void Master::_reregisterSlave(
     CHECK(slave->active)
       << "Unexpected connected but deactivated agent " << *slave;
 
-    // Inform the agent of the master's version of its checkpointed
-    // resources and the new framework pids for its tasks.
+    // Inform the agent of the new framework pids for its tasks.
     ___reregisterSlave(slave, frameworks);
 
     slaves.reregistering.erase(slaveInfo.id());
+
+    // Send checkpointed resources to the agent. This is important for
+    // the cases where the master didn't fail over. In that case, the
+    // master might have already applied an operation that the agent
+    // didn't see (e.g., due to a breaking connection). This message
+    // will sync the state between the master and the agent about
+    // checkpointed resources.
+    CheckpointResourcesMessage message;
+
+    message.mutable_resources()->CopyFrom(slave->checkpointedResources);
+
+    if (!slave->capabilities.reservationRefinement) {
+      // If the agent is not refinement-capable, don't send it
+      // checkpointed resources that contain refined reservations. This
+      // might occur if a reservation refinement is created but never
+      // reaches the agent (e.g., due to network partition), and then
+      // the agent is downgraded before the partition heals.
+      //
+      // TODO(neilc): It would probably be better to prevent the agent
+      // from re-registering in this scenario.
+      Try<Nothing> result = downgradeResources(message.mutable_resources());
+      if (result.isError()) {
+        LOG(WARNING) << "Not sending updated checkpointed resouces "
+                     << slave->checkpointedResources
+                     << " with refined reservations, since agent " << *slave
+                     << " is not RESERVATION_REFINEMENT-capable.";
+
+        return;
+      }
+    }
+
+    LOG(INFO) << "Sending updated checkpointed resources "
+              << slave->checkpointedResources
+              << " to agent " << *slave;
+
+    send(slave->pid, message);
+
     return;
   }
 
@@ -6657,45 +6693,6 @@ void Master::___reregisterSlave(
       recoverFramework(frameworkInfo, {});
     }
   }
-
-  // Send checkpointed resources to the agent. This is important for
-  // the cases where the master didn't fail over. In that case, the
-  // master might have already applied an operation that the agent
-  // didn't see (e.g., due to a breaking connection). This message
-  // will sync the state between the master and the agent about
-  // checkpointed resources.
-  //
-  // TODO(jieyu): This message is not necessary for the case where the
-  // master fails over. Consider moving this to `reconcileKnownSlave`.
-  CheckpointResourcesMessage message;
-
-  message.mutable_resources()->CopyFrom(slave->checkpointedResources);
-
-  if (!slave->capabilities.reservationRefinement) {
-    // If the agent is not refinement-capable, don't send it
-    // checkpointed resources that contain refined reservations. This
-    // might occur if a reservation refinement is created but never
-    // reaches the agent (e.g., due to network partition), and then
-    // the agent is downgraded before the partition heals.
-    //
-    // TODO(neilc): It would probably be better to prevent the agent
-    // from re-registering in this scenario.
-    Try<Nothing> result = downgradeResources(message.mutable_resources());
-    if (result.isError()) {
-      LOG(WARNING) << "Not sending updated checkpointed resouces "
-                   << slave->checkpointedResources
-                   << " with refined reservations, since agent " << *slave
-                   << " is not RESERVATION_REFINEMENT-capable.";
-
-      return;
-    }
-  }
-
-  LOG(INFO) << "Sending updated checkpointed resources "
-            << slave->checkpointedResources
-            << " to agent " << *slave;
-
-  send(slave->pid, message);
 }
 
 


[3/7] mesos git commit: Added a call to update total resources and pending operations.

Posted by ji...@apache.org.
Added a call to update total resources and pending operations.

Now a resource provider first `SUBSCRIBE` to the resource provider
manager without resources to get its ID, then use the ID to prepare the
checkpoints for recovery and persistent work directory, and then
update its total resources and pending operations through
`UPDATE_STATE`.

Review: https://reviews.apache.org/r/62903/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/94907305
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/94907305
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/94907305

Branch: refs/heads/master
Commit: 94907305d6caf0c2f3d2dab19a4068fd2df63354
Parents: d6ac32d
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Wed Oct 18 16:37:41 2017 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Sun Oct 29 15:57:28 2017 +0100

----------------------------------------------------------------------
 .../resource_provider/resource_provider.proto   |  24 ++--
 .../resource_provider/resource_provider.proto   |  19 ++-
 src/resource_provider/manager.cpp               |  66 ++++++----
 src/resource_provider/validation.cpp            |  12 ++
 src/tests/resource_provider_manager_tests.cpp   | 132 ++++++++++++-------
 .../resource_provider_validation_tests.cpp      |  23 ++++
 6 files changed, 184 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/94907305/include/mesos/resource_provider/resource_provider.proto
----------------------------------------------------------------------
diff --git a/include/mesos/resource_provider/resource_provider.proto b/include/mesos/resource_provider/resource_provider.proto
index ef67aa1..1b26f89 100644
--- a/include/mesos/resource_provider/resource_provider.proto
+++ b/include/mesos/resource_provider/resource_provider.proto
@@ -70,23 +70,12 @@ message Call {
 
     SUBSCRIBE = 1;                     // See 'Subscribe'.
     UPDATE_OFFER_OPERATION_STATUS = 2; // See 'UpdateOfferOperationStatus'.
+    UPDATE_STATE = 3;                  // See 'UpdateState'.
   }
 
   // Request to subscribe with the master.
-  //
-  // TODO(bbannier): Once we have implemented a call to update a
-  // resource provider, consider removing resources here and instead
-  // moving to a protocol where a resource provider first subscribes
-  // and then updates its resources.
   message Subscribe {
     required ResourceProviderInfo resource_provider_info = 1;
-
-    // This includes pending operations and those that have
-    // unacknowledged statuses.
-    repeated OfferOperation operations = 3;
-
-    // The total resources provided by this resource provider.
-    repeated Resource resources = 2;
   }
 
   // Notify the master about the status of an offer operation.
@@ -101,6 +90,16 @@ message Call {
     required bytes operation_uuid = 4;
   }
 
+  // Notify the master about the total resources and pending operations.
+  message UpdateState {
+    // This includes pending operations and those that have
+    // unacknowledged statuses.
+    repeated OfferOperation operations = 1;
+
+    // The total resources provided by this resource provider.
+    repeated Resource resources = 2;
+  }
+
   // Identifies who generated this call.
   // The 'ResourceProviderManager' assigns a resource provider ID when
   // a new resource provider subscribes for the first time. Once
@@ -114,4 +113,5 @@ message Call {
   optional Type type = 2;
   optional Subscribe subscribe = 3;
   optional UpdateOfferOperationStatus update_offer_operation_status = 4;
+  optional UpdateState update_state = 5;
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/94907305/include/mesos/v1/resource_provider/resource_provider.proto
----------------------------------------------------------------------
diff --git a/include/mesos/v1/resource_provider/resource_provider.proto b/include/mesos/v1/resource_provider/resource_provider.proto
index 10b77c7..3afdbb9 100644
--- a/include/mesos/v1/resource_provider/resource_provider.proto
+++ b/include/mesos/v1/resource_provider/resource_provider.proto
@@ -70,18 +70,12 @@ message Call {
 
     SUBSCRIBE = 1;                     // See 'Subscribe'.
     UPDATE_OFFER_OPERATION_STATUS = 2; // See 'UpdateOfferOperationStatus'.
+    UPDATE_STATE = 3;                  // See 'UpdateState'.
   }
 
   // Request to subscribe with the master.
   message Subscribe {
     required ResourceProviderInfo resource_provider_info = 1;
-
-    // This includes pending operations and those that have
-    // unacknowledged statuses.
-    repeated OfferOperation operations = 3;
-
-    // The total resources provided by this resource provider.
-    repeated Resource resources = 2;
   }
 
   // Notify the master about the status of an operation.
@@ -96,6 +90,16 @@ message Call {
     required bytes operation_uuid = 4;
   }
 
+  // Notify the master about the total resources and pending operations.
+  message UpdateState {
+    // This includes pending operations and those that have
+    // unacknowledged statuses.
+    repeated OfferOperation operations = 1;
+
+    // The total resources provided by this resource provider.
+    repeated Resource resources = 2;
+  }
+
   // Identifies who generated this call.
   // The 'ResourceProviderManager' assigns a resource provider ID when
   // a new resource provider subscribes for the first time. Once
@@ -109,4 +113,5 @@ message Call {
   optional Type type = 2;
   optional Subscribe subscribe = 3;
   optional UpdateOfferOperationStatus update_offer_operation_status = 4;
+  optional UpdateState update_state = 5;
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/94907305/src/resource_provider/manager.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp
index 4713a2a..11f8901 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -120,11 +120,9 @@ struct ResourceProvider
 {
   ResourceProvider(
       const ResourceProviderInfo& _info,
-      const HttpConnection& _http,
-      const Resources& _resources)
+      const HttpConnection& _http)
     : info(_info),
-      http(_http),
-      resources(_resources) {}
+      http(_http) {}
 
   ResourceProviderInfo info;
   HttpConnection http;
@@ -155,6 +153,10 @@ private:
       ResourceProvider* resourceProvider,
       const Call::UpdateOfferOperationStatus& update);
 
+  void updateState(
+      ResourceProvider* resourceProvider,
+      const Call::UpdateState& update);
+
   ResourceProviderID newResourceProviderId();
 };
 
@@ -289,6 +291,11 @@ Future<http::Response> ResourceProviderManagerProcess::api(
 
       return Accepted();
     }
+
+    case Call::UPDATE_STATE: {
+      updateState(&resourceProvider, call.update_state());
+      return Accepted();
+    }
   }
 
   UNREACHABLE();
@@ -301,16 +308,15 @@ void ResourceProviderManagerProcess::subscribe(
 {
   ResourceProviderInfo resourceProviderInfo =
     subscribe.resource_provider_info();
-  resourceProviderInfo.mutable_id()->CopyFrom(newResourceProviderId());
 
-  // Inject the `ResourceProviderID` for all subscribed resources.
-  Resources resources;
-  foreach (Resource resource, subscribe.resources()) {
-    resource.mutable_provider_id()->CopyFrom(resourceProviderInfo.id());
-    resources += resource;
+  // TODO(chhsiao): Reject the subscription if it contains an unknown ID
+  // or there is already a subscribed instance with the same ID, and add
+  // tests for re-subscriptions.
+  if (!resourceProviderInfo.has_id()) {
+    resourceProviderInfo.mutable_id()->CopyFrom(newResourceProviderId());
   }
 
-  ResourceProvider resourceProvider(resourceProviderInfo, http, resources);
+  ResourceProvider resourceProvider(resourceProviderInfo, http);
 
   Event event;
   event.set_type(Event::SUBSCRIBED);
@@ -324,17 +330,6 @@ void ResourceProviderManagerProcess::subscribe(
   }
 
   resourceProviders.put(resourceProviderInfo.id(), std::move(resourceProvider));
-
-  ResourceProviderMessage message;
-  message.type = ResourceProviderMessage::Type::UPDATE_TOTAL_RESOURCES;
-
-  ResourceProviderMessage::UpdateTotalResources updateTotalResources;
-  updateTotalResources.id = resourceProviderInfo.id();
-  updateTotalResources.total = resources;
-
-  message.updateTotalResources = std::move(updateTotalResources);
-
-  messages.put(std::move(message));
 }
 
 
@@ -346,6 +341,33 @@ void ResourceProviderManagerProcess::updateOfferOperationStatus(
 }
 
 
+void ResourceProviderManagerProcess::updateState(
+    ResourceProvider* resourceProvider,
+    const Call::UpdateState& update)
+{
+  Resources resources;
+
+  foreach (const Resource& resource, update.resources()) {
+    CHECK_EQ(resource.provider_id(), resourceProvider->info.id());
+    resources += resource;
+  }
+
+  resourceProvider->resources = std::move(resources);
+
+  // TODO(chhsiao): Report pending operations.
+
+  ResourceProviderMessage::UpdateTotalResources updateTotalResources;
+  updateTotalResources.id = resourceProvider->info.id();
+  updateTotalResources.total = resourceProvider->resources;
+
+  ResourceProviderMessage message;
+  message.type = ResourceProviderMessage::Type::UPDATE_TOTAL_RESOURCES;
+  message.updateTotalResources = std::move(updateTotalResources);
+
+  messages.put(std::move(message));
+}
+
+
 ResourceProviderID ResourceProviderManagerProcess::newResourceProviderId()
 {
   ResourceProviderID resourceProviderId;

http://git-wip-us.apache.org/repos/asf/mesos/blob/94907305/src/resource_provider/validation.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/validation.cpp b/src/resource_provider/validation.cpp
index d8a9fb0..ddc57c3 100644
--- a/src/resource_provider/validation.cpp
+++ b/src/resource_provider/validation.cpp
@@ -61,6 +61,18 @@ Option<Error> validate(const Call& call)
 
       return None();
     }
+
+    case Call::UPDATE_STATE: {
+      if (!call.has_resource_provider_id()) {
+        return Error("Expecting 'resource_provider_id' to be present");
+      }
+
+      if (!call.has_update_state()) {
+        return Error("Expecting 'update_state' to be present");
+      }
+
+      return None();
+    }
   }
 
   UNREACHABLE();

http://git-wip-us.apache.org/repos/asf/mesos/blob/94907305/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index ca49e1f..f0fd5e8 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -77,6 +77,7 @@ using process::Future;
 using process::Owned;
 using process::PID;
 
+using process::http::Accepted;
 using process::http::BadRequest;
 using process::http::OK;
 using process::http::UnsupportedMediaType;
@@ -210,78 +211,107 @@ TEST_P(ResourceProviderManagerHttpApiTest, UnsupportedContentMediaType)
 }
 
 
-TEST_P(ResourceProviderManagerHttpApiTest, Subscribe)
+TEST_P(ResourceProviderManagerHttpApiTest, UpdateState)
 {
-  Call call;
-  call.set_type(Call::SUBSCRIBE);
+  const ContentType contentType = GetParam();
 
-  Call::Subscribe* subscribe = call.mutable_subscribe();
+  ResourceProviderManager manager;
 
-  const v1::Resources resources = v1::Resources::parse("disk:4").get();
-  subscribe->mutable_resources()->CopyFrom(resources);
+  Option<UUID> streamId;
+  Option<mesos::v1::ResourceProviderID> resourceProviderId;
 
-  mesos::v1::ResourceProviderInfo* info =
-    subscribe->mutable_resource_provider_info();
+  // First, subscribe to the manager to get the ID.
+  {
+    Call call;
+    call.set_type(Call::SUBSCRIBE);
 
-  info->set_type("org.apache.mesos.rp.test");
-  info->set_name("test");
+    Call::Subscribe* subscribe = call.mutable_subscribe();
 
-  const ContentType contentType = GetParam();
+    mesos::v1::ResourceProviderInfo* info =
+      subscribe->mutable_resource_provider_info();
 
-  http::Request request;
-  request.method = "POST";
-  request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
-  request.headers["Accept"] = stringify(contentType);
-  request.headers["Content-Type"] = stringify(contentType);
-  request.body = serialize(contentType, call);
+    info->set_type("org.apache.mesos.rp.test");
+    info->set_name("test");
 
-  ResourceProviderManager manager;
+    http::Request request;
+    request.method = "POST";
+    request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+    request.headers["Accept"] = stringify(contentType);
+    request.headers["Content-Type"] = stringify(contentType);
+    request.body = serialize(contentType, call);
 
-  Future<http::Response> response = manager.api(request, None());
+    Future<http::Response> response = manager.api(request, None());
 
-  AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
-  ASSERT_EQ(http::Response::PIPE, response->type);
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+    ASSERT_EQ(http::Response::PIPE, response->type);
 
-  Option<http::Pipe::Reader> reader = response->reader;
-  ASSERT_SOME(reader);
+    ASSERT_TRUE(response->headers.contains("Mesos-Stream-Id"));
+    Try<UUID> uuid = UUID::fromString(response->headers.at("Mesos-Stream-Id"));
 
-  recordio::Reader<Event> responseDecoder(
-      ::recordio::Decoder<Event>(
-          lambda::bind(deserialize<Event>, contentType, lambda::_1)),
-      reader.get());
+    CHECK_SOME(uuid);
+    streamId = uuid.get();
 
-  Future<Result<Event>> event = responseDecoder.read();
-  AWAIT_READY(event);
-  ASSERT_SOME(event.get());
+    Option<http::Pipe::Reader> reader = response->reader;
+    ASSERT_SOME(reader);
 
-  // Check event type is subscribed and the resource provider id is set.
-  ASSERT_EQ(Event::SUBSCRIBED, event->get().type());
+    recordio::Reader<Event> responseDecoder(
+        ::recordio::Decoder<Event>(
+            lambda::bind(deserialize<Event>, contentType, lambda::_1)),
+        reader.get());
 
-  mesos::v1::ResourceProviderID resourceProviderId =
-    event->get().subscribed().provider_id();
+    Future<Result<Event>> event = responseDecoder.read();
+    AWAIT_READY(event);
+    ASSERT_SOME(event.get());
 
-  EXPECT_FALSE(resourceProviderId.value().empty());
+    // Check event type is subscribed and the resource provider id is set.
+    ASSERT_EQ(Event::SUBSCRIBED, event->get().type());
 
-  // The manager will send out a message informing its subscriber
-  // about the newly added resources.
-  Future<ResourceProviderMessage> message = manager.messages().get();
+    resourceProviderId = event->get().subscribed().provider_id();
 
-  AWAIT_READY(message);
+    EXPECT_FALSE(resourceProviderId->value().empty());
+  }
 
-  EXPECT_EQ(
-      ResourceProviderMessage::Type::UPDATE_TOTAL_RESOURCES,
-      message->type);
+  // Then, update the total resources to the manager.
+  {
+    std::vector<v1::Resource> resources =
+      v1::Resources::fromString("disk:4").get();
+    foreach (v1::Resource& resource, resources) {
+      resource.mutable_provider_id()->CopyFrom(resourceProviderId.get());
+    }
 
-  // We expect `ResourceProviderID`s to be set for all subscribed resources.
-  // Inject them into the test expectation.
-  Resources expectedResources;
-  foreach (v1::Resource resource, resources) {
-    resource.mutable_provider_id()->CopyFrom(resourceProviderId);
-    expectedResources += devolve(resource);
-  }
+    Call call;
+    call.set_type(Call::UPDATE_STATE);
+    call.mutable_resource_provider_id()->CopyFrom(resourceProviderId.get());
+
+    Call::UpdateState* updateState = call.mutable_update_state();
+
+    updateState->mutable_resources()->CopyFrom(v1::Resources(resources));
+
+    http::Request request;
+    request.method = "POST";
+    request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+    request.headers["Accept"] = stringify(contentType);
+    request.headers["Content-Type"] = stringify(contentType);
+    request.headers["Mesos-Stream-Id"] = stringify(streamId.get());
+    request.body = serialize(contentType, call);
 
-  EXPECT_EQ(devolve(resourceProviderId), message->updateTotalResources->id);
-  EXPECT_EQ(expectedResources, message->updateTotalResources->total);
+    Future<http::Response> response = manager.api(request, None());
+
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(Accepted().status, response);
+
+    // The manager will send out a message informing its subscriber
+    // about the newly added resources.
+    Future<ResourceProviderMessage> message = manager.messages().get();
+
+    AWAIT_READY(message);
+
+    EXPECT_EQ(
+        ResourceProviderMessage::Type::UPDATE_TOTAL_RESOURCES,
+        message->type);
+    EXPECT_EQ(
+        devolve(resourceProviderId.get()), message->updateTotalResources->id);
+    EXPECT_EQ(devolve(resources), message->updateTotalResources->total);
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/94907305/src/tests/resource_provider_validation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_validation_tests.cpp b/src/tests/resource_provider_validation_tests.cpp
index 862e844..e6960b1 100644
--- a/src/tests/resource_provider_validation_tests.cpp
+++ b/src/tests/resource_provider_validation_tests.cpp
@@ -84,6 +84,29 @@ TEST(ResourceProviderCallValidationTest, UpdateOfferOperationStatus)
   EXPECT_NONE(error);
 }
 
+
+TEST(ResourceProviderCallValidationTest, UpdateState)
+{
+  Call call;
+  call.set_type(Call::UPDATE_STATE);
+
+  // Expecting a resource provider ID and `Call::UpdateState`.
+  Option<Error> error = call::validate(call);
+  EXPECT_SOME(error);
+
+  ResourceProviderID* id = call.mutable_resource_provider_id();
+  id->set_value(UUID::random().toString());
+
+  // Still expecting `Call::UpdateState`.
+  error = call::validate(call);
+  EXPECT_SOME(error);
+
+  call.mutable_update_state();
+
+  error = call::validate(call);
+  EXPECT_NONE(error);
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {


[6/7] mesos git commit: Stopped sending checkpoint resources message on agent re-registration.

Posted by ji...@apache.org.
Stopped sending checkpoint resources message on agent re-registration.

Given that resource provider capable agents will send update slave
message to the master during re-registration, no need for the master
to send checkpoint resources message to the agent anymore.

This also makes the code more consistent because agent should be the
source of truth. This also eliminates the possible retry incurred by
this message, which is never the intention.

Review: https://reviews.apache.org/r/62879


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/33d1ff17
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/33d1ff17
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/33d1ff17

Branch: refs/heads/master
Commit: 33d1ff1798f8cbf83b4e5f7bc79dbf8e231dff1f
Parents: e9ac9f8
Author: Jie Yu <yu...@gmail.com>
Authored: Tue Oct 10 20:17:53 2017 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Sun Oct 29 15:57:28 2017 +0100

----------------------------------------------------------------------
 src/master/master.cpp | 62 ++++++++++++++++++++++++++--------------------
 1 file changed, 35 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/33d1ff17/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index c118b9d..5b2c9a0 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -6318,41 +6318,49 @@ void Master::_reregisterSlave(
 
     slaves.reregistering.erase(slaveInfo.id());
 
-    // Send checkpointed resources to the agent. This is important for
+    // If the agent is not resource provider capable (legacy agent),
+    // send checkpointed resources to the agent. This is important for
     // the cases where the master didn't fail over. In that case, the
     // master might have already applied an operation that the agent
     // didn't see (e.g., due to a breaking connection). This message
     // will sync the state between the master and the agent about
     // checkpointed resources.
-    CheckpointResourcesMessage message;
-
-    message.mutable_resources()->CopyFrom(slave->checkpointedResources);
-
-    if (!slave->capabilities.reservationRefinement) {
-      // If the agent is not refinement-capable, don't send it
-      // checkpointed resources that contain refined reservations. This
-      // might occur if a reservation refinement is created but never
-      // reaches the agent (e.g., due to network partition), and then
-      // the agent is downgraded before the partition heals.
-      //
-      // TODO(neilc): It would probably be better to prevent the agent
-      // from re-registering in this scenario.
-      Try<Nothing> result = downgradeResources(message.mutable_resources());
-      if (result.isError()) {
-        LOG(WARNING) << "Not sending updated checkpointed resouces "
-                     << slave->checkpointedResources
-                     << " with refined reservations, since agent " << *slave
-                     << " is not RESERVATION_REFINEMENT-capable.";
-
-        return;
+    //
+    // New agents that are resource provider capable will always
+    // update the master with total resources during re-registration.
+    // Therefore, no need to send checkpointed resources to the new
+    // agent in this case.
+    if (!slave->capabilities.resourceProvider) {
+      CheckpointResourcesMessage message;
+
+      message.mutable_resources()->CopyFrom(slave->checkpointedResources);
+
+      if (!slave->capabilities.reservationRefinement) {
+        // If the agent is not refinement-capable, don't send it
+        // checkpointed resources that contain refined reservations. This
+        // might occur if a reservation refinement is created but never
+        // reaches the agent (e.g., due to network partition), and then
+        // the agent is downgraded before the partition heals.
+        //
+        // TODO(neilc): It would probably be better to prevent the agent
+        // from re-registering in this scenario.
+        Try<Nothing> result = downgradeResources(message.mutable_resources());
+        if (result.isError()) {
+          LOG(WARNING) << "Not sending updated checkpointed resouces "
+                       << slave->checkpointedResources
+                       << " with refined reservations, since agent " << *slave
+                       << " is not RESERVATION_REFINEMENT-capable.";
+
+          return;
+        }
       }
-    }
 
-    LOG(INFO) << "Sending updated checkpointed resources "
-              << slave->checkpointedResources
-              << " to agent " << *slave;
+      LOG(INFO) << "Sending updated checkpointed resources "
+                << slave->checkpointedResources
+                << " to agent " << *slave;
 
-    send(slave->pid, message);
+      send(slave->pid, message);
+    }
 
     return;
   }