You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2017/11/13 18:11:28 UTC

[1/3] mesos git commit: Provided handling for offer operation updates.

Repository: mesos
Updated Branches:
  refs/heads/master cf543bafc -> 663b893a0


Provided handling for offer operation updates.

When a resource provider has finished an offer operation, it will send
a status update to the resource provider manager and subsequently to an
agent. The agent then updates its internal bookkeeping and forwards the
status update to the master.

Review: https://reviews.apache.org/r/63622/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c946615e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c946615e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c946615e

Branch: refs/heads/master
Commit: c946615ec6d0deb26aae984d99097ae6bc02058d
Parents: cf543ba
Author: Jan Schlicht <ja...@mesosphere.io>
Authored: Mon Nov 13 09:29:15 2017 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Mon Nov 13 09:29:15 2017 -0800

----------------------------------------------------------------------
 src/common/protobuf_utils.cpp                 |   9 +-
 src/common/protobuf_utils.hpp                 |   3 +-
 src/resource_provider/manager.cpp             |  14 +-
 src/resource_provider/message.hpp             |  28 ++-
 src/slave/slave.cpp                           | 223 +++++++++++++++++++++
 src/slave/slave.hpp                           |  18 ++
 src/tests/resource_provider_manager_tests.cpp | 117 +++++++++++
 7 files changed, 406 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/c946615e/src/common/protobuf_utils.cpp
----------------------------------------------------------------------
diff --git a/src/common/protobuf_utils.cpp b/src/common/protobuf_utils.cpp
index 5739a63..3405484 100644
--- a/src/common/protobuf_utils.cpp
+++ b/src/common/protobuf_utils.cpp
@@ -444,14 +444,19 @@ OfferOperation createOfferOperation(
     const Offer::Operation& info,
     const OfferOperationStatus& latestStatus,
     const FrameworkID& frameworkId,
-    const SlaveID& slaveId)
+    const SlaveID& slaveId,
+    const Option<UUID>& operationUUID)
 {
   OfferOperation operation;
   operation.mutable_framework_id()->CopyFrom(frameworkId);
   operation.mutable_slave_id()->CopyFrom(slaveId);
   operation.mutable_info()->CopyFrom(info);
   operation.mutable_latest_status()->CopyFrom(latestStatus);
-  operation.set_operation_uuid(UUID::random().toBytes());
+  if (operationUUID.isSome()) {
+    operation.set_operation_uuid(operationUUID->toBytes());
+  } else {
+    operation.set_operation_uuid(UUID::random().toBytes());
+  }
 
   return operation;
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/c946615e/src/common/protobuf_utils.hpp
----------------------------------------------------------------------
diff --git a/src/common/protobuf_utils.hpp b/src/common/protobuf_utils.hpp
index 0ca4c6d..b2aa365 100644
--- a/src/common/protobuf_utils.hpp
+++ b/src/common/protobuf_utils.hpp
@@ -164,7 +164,8 @@ OfferOperation createOfferOperation(
     const Offer::Operation& info,
     const OfferOperationStatus& latestStatus,
     const FrameworkID& frameworkId,
-    const SlaveID& slaveId);
+    const SlaveID& slaveId,
+    const Option<UUID>& operationUUID = None());
 
 
 // Helper function that creates a MasterInfo from UPID.

http://git-wip-us.apache.org/repos/asf/mesos/blob/c946615e/src/resource_provider/manager.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp
index bcc833b..6dfc429 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -415,7 +415,19 @@ void ResourceProviderManagerProcess::updateOfferOperationStatus(
     ResourceProvider* resourceProvider,
     const Call::UpdateOfferOperationStatus& update)
 {
-  // TODO(nfnt): Implement the 'UPDATE_OFFER_OPERATION_STATUS' call handler.
+  ResourceProviderMessage::UpdateOfferOperationStatus body;
+  body.update.mutable_framework_id()->CopyFrom(update.framework_id());
+  body.update.mutable_status()->CopyFrom(update.status());
+  body.update.set_operation_uuid(update.operation_uuid());
+  if (update.has_latest_status()) {
+    body.update.mutable_latest_status()->CopyFrom(update.latest_status());
+  }
+
+  ResourceProviderMessage message;
+  message.type = ResourceProviderMessage::Type::UPDATE_OFFER_OPERATION_STATUS;
+  message.updateOfferOperationStatus = std::move(body);
+
+  messages.put(std::move(message));
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/c946615e/src/resource_provider/message.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/message.hpp b/src/resource_provider/message.hpp
index a1a84c1..05879cd 100644
--- a/src/resource_provider/message.hpp
+++ b/src/resource_provider/message.hpp
@@ -23,9 +23,13 @@
 #include <mesos/resources.hpp>
 
 #include <stout/check.hpp>
+#include <stout/jsonify.hpp>
 #include <stout/option.hpp>
+#include <stout/protobuf.hpp>
 #include <stout/unreachable.hpp>
 
+#include "messages/messages.hpp"
+
 namespace mesos {
 namespace internal {
 
@@ -33,7 +37,8 @@ struct ResourceProviderMessage
 {
   enum class Type
   {
-    UPDATE_TOTAL_RESOURCES
+    UPDATE_TOTAL_RESOURCES,
+    UPDATE_OFFER_OPERATION_STATUS
   };
 
   struct UpdateTotalResources
@@ -43,9 +48,15 @@ struct ResourceProviderMessage
     Resources total;
   };
 
+  struct UpdateOfferOperationStatus
+  {
+    OfferOperationStatusUpdate update;
+  };
+
   Type type;
 
   Option<UpdateTotalResources> updateTotalResources;
+  Option<UpdateOfferOperationStatus> updateOfferOperationStatus;
 };
 
 
@@ -54,7 +65,7 @@ inline std::ostream& operator<<(
     const ResourceProviderMessage& resourceProviderMessage)
 {
   switch (resourceProviderMessage.type) {
-    case ResourceProviderMessage::Type::UPDATE_TOTAL_RESOURCES:
+    case ResourceProviderMessage::Type::UPDATE_TOTAL_RESOURCES: {
       const Option<ResourceProviderMessage::UpdateTotalResources>&
         updateTotalResources = resourceProviderMessage.updateTotalResources;
 
@@ -64,6 +75,19 @@ inline std::ostream& operator<<(
           << "UPDATE_TOTAL_RESOURCES: "
           << updateTotalResources->id << " "
           << updateTotalResources->total;
+    }
+
+    case ResourceProviderMessage::Type::UPDATE_OFFER_OPERATION_STATUS: {
+      const Option<ResourceProviderMessage::UpdateOfferOperationStatus>&
+        updateOfferOperationStatus =
+          resourceProviderMessage.updateOfferOperationStatus;
+
+      CHECK_SOME(updateOfferOperationStatus);
+
+      return stream
+          << "UPDATE_OFFER_OPERATION_STATUS: "
+          << jsonify(JSON::Protobuf(updateOfferOperationStatus->update));
+    }
   }
 
   UNREACHABLE();

http://git-wip-us.apache.org/repos/asf/mesos/blob/c946615e/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 7cb6661..d8baceb 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -3703,6 +3703,16 @@ void Slave::applyOfferOperation(const ApplyOfferOperationMessage& message)
     return;
   }
 
+  OfferOperation* offerOperation = new OfferOperation(
+      protobuf::createOfferOperation(
+          message.operation_info(),
+          protobuf::createOfferOperationStatus(OFFER_OPERATION_PENDING),
+          message.framework_id(),
+          info.id(),
+          uuid.get()));
+
+  addOfferOperation(offerOperation);
+
   if (resourceProviderId.isSome()) {
     resourceProviderManager.applyOfferOperation(message);
     return;
@@ -6772,6 +6782,59 @@ void Slave::handleResourceProviderMessage(
       }
       break;
     }
+    case ResourceProviderMessage::Type::UPDATE_OFFER_OPERATION_STATUS: {
+      CHECK_SOME(message->updateOfferOperationStatus);
+
+      const OfferOperationStatusUpdate& update =
+        message->updateOfferOperationStatus->update;
+
+      Try<UUID> operationUUID = UUID::fromBytes(update.operation_uuid());
+      CHECK_SOME(operationUUID);
+
+      OfferOperation* operation = getOfferOperation(operationUUID.get());
+      if (operation == nullptr) {
+        LOG(WARNING) << "Failed to find the offer operation '"
+                     << update.status().operation_id() << "' (uuid: "
+                     << operationUUID->toString() << ") for framework "
+                     << update.framework_id();
+        return;
+      }
+
+      updateOfferOperation(operation, update);
+
+      switch (state) {
+        case RECOVERING:
+        case DISCONNECTED:
+        case TERMINATING: {
+          LOG(WARNING) << "Dropping status update of offer operation '"
+                       << update.status().operation_id() << "' (uuid: "
+                       << operationUUID->toString() << ") for framework "
+                       << update.framework_id() << " because agent is in "
+                       << state << " state";
+          break;
+        }
+        case RUNNING: {
+          LOG(INFO) << "Forwarding status update of offer operation '"
+                    << update.status().operation_id()
+                    << "' (uuid: " << operationUUID->toString()
+                    << ") for framework " << update.framework_id();
+
+          // The status update from the resource provider didn't
+          // provide the agent ID (because the resource provider doesn't
+          // know it), hence we inject it here.
+          OfferOperationStatusUpdate _update;
+          _update.CopyFrom(update);
+          _update.mutable_slave_id()->CopyFrom(info.id());
+
+          send(master.get(), _update);
+          break;
+        }
+      }
+
+      if (protobuf::isTerminalState(operation->latest_status().state())) {
+        removeOfferOperation(operation);
+      }
+    }
   }
 
   // Wait for the next message.
@@ -6780,6 +6843,166 @@ void Slave::handleResourceProviderMessage(
 }
 
 
+void Slave::addOfferOperation(OfferOperation* operation)
+{
+  Try<UUID> uuid = UUID::fromBytes(operation->operation_uuid());
+  CHECK_SOME(uuid);
+
+  offerOperations.put(uuid.get(), operation);
+}
+
+
+void Slave::updateOfferOperation(
+    OfferOperation* operation,
+    const OfferOperationStatusUpdate& update)
+{
+  CHECK_NOTNULL(operation);
+
+  const OfferOperationStatus& status = update.status();
+
+  Option<OfferOperationStatus> latestStatus;
+  if (update.has_latest_status()) {
+    latestStatus = update.latest_status();
+  }
+
+  // Whether the offer operation has just become terminated.
+  Option<bool> terminated;
+
+  if (latestStatus.isSome()) {
+    terminated =
+      !protobuf::isTerminalState(operation->latest_status().state()) &&
+      protobuf::isTerminalState(latestStatus->state());
+
+    // If the operation has already transitioned to a terminal state,
+    // do not update its state.
+    if (!protobuf::isTerminalState(operation->latest_status().state())) {
+      operation->mutable_latest_status()->CopyFrom(latestStatus.get());
+    }
+  } else {
+    terminated =
+      !protobuf::isTerminalState(operation->latest_status().state()) &&
+      protobuf::isTerminalState(status.state());
+
+    if (!protobuf::isTerminalState(operation->latest_status().state())) {
+      operation->mutable_latest_status()->CopyFrom(status);
+    }
+  }
+
+  operation->add_statuses()->CopyFrom(status);
+
+  LOG(INFO) << "Updating the state of offer operation '"
+            << operation->info().id()
+            << "' (uuid: " << operation->operation_uuid()
+            << ") of framework " << operation->framework_id()
+            << " (latest state: " << operation->latest_status().state()
+            << ", status update state: " << status.state() << ")";
+
+  CHECK_SOME(terminated);
+
+  if (!terminated.get()) {
+    return;
+  }
+
+  Resource consumed;
+  switch (operation->info().type()) {
+    case Offer::Operation::LAUNCH:
+      LOG(FATAL) << "Unexpected LAUNCH operation";
+      break;
+    case Offer::Operation::LAUNCH_GROUP:
+      LOG(FATAL) << "Unexpected LAUNCH_GROUP operation";
+      break;
+    case Offer::Operation::RESERVE:
+    case Offer::Operation::UNRESERVE:
+    case Offer::Operation::CREATE:
+    case Offer::Operation::DESTROY:
+      return;
+    case Offer::Operation::CREATE_VOLUME:
+      consumed = operation->info().create_volume().source();
+      break;
+    case Offer::Operation::DESTROY_VOLUME:
+      consumed = operation->info().destroy_volume().volume();
+      break;
+    case Offer::Operation::CREATE_BLOCK:
+      consumed = operation->info().create_block().source();
+      break;
+    case Offer::Operation::DESTROY_BLOCK:
+      consumed = operation->info().destroy_block().block();
+      break;
+    case Offer::Operation::UNKNOWN:
+      LOG(WARNING) << "Unknown offer operation";
+      return;
+  }
+
+  switch (update.latest_status().state()) {
+    // Terminal state, and the conversion is successful.
+    case OFFER_OPERATION_FINISHED: {
+      // 'totalResources' don't have allocations set, we need
+      // to remove them from the consumed and converted resources.
+      if (consumed.has_allocation_info()) {
+        consumed.clear_allocation_info();
+      }
+
+      Resources converted =
+        update.latest_status().converted_resources();
+      converted.unallocate();
+
+      ResourceConversion conversion(consumed, converted);
+
+      apply({conversion});
+
+      break;
+    }
+
+    // Terminal state, and the conversion has failed.
+    case OFFER_OPERATION_FAILED:
+    case OFFER_OPERATION_ERROR: {
+      break;
+    }
+
+    // Non-terminal. This shouldn't happen.
+    case OFFER_OPERATION_PENDING:
+    case OFFER_OPERATION_UNSUPPORTED: {
+      LOG(FATAL) << "Unexpected offer operation state "
+                 << operation->latest_status().state();
+    }
+  }
+}
+
+
+void Slave::removeOfferOperation(OfferOperation* operation)
+{
+  Try<UUID> uuid = UUID::fromBytes(operation->operation_uuid());
+  CHECK_SOME(uuid);
+
+  CHECK(offerOperations.contains(uuid.get()))
+    << "Unknown offer operation (uuid: " << uuid->toString() << ")";
+
+  CHECK(protobuf::isTerminalState(operation->latest_status().state()))
+    << operation->latest_status().state();
+
+  offerOperations.erase(uuid.get());
+  delete operation;
+}
+
+
+OfferOperation* Slave::getOfferOperation(const UUID& uuid) const
+{
+  if (offerOperations.contains(uuid)) {
+    return offerOperations.at(uuid);
+  }
+  return nullptr;
+}
+
+
+void Slave::apply(const vector<ResourceConversion>& conversions)
+{
+  Try<Resources> resources = totalResources.apply(conversions);
+  CHECK_SOME(resources);
+
+  totalResources = resources.get();
+}
+
+
 void Slave::qosCorrections()
 {
   qosController->corrections()

http://git-wip-us.apache.org/repos/asf/mesos/blob/c946615e/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 0124df4..c0acaa6 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -533,6 +533,20 @@ private:
   void handleResourceProviderMessage(
       const process::Future<ResourceProviderMessage>& message);
 
+  void addOfferOperation(OfferOperation* operation);
+
+  // Transitions the offer operation, and recovers resource if the
+  // offer operation becomes terminal.
+  void updateOfferOperation(
+      OfferOperation* operation,
+      const OfferOperationStatusUpdate& update);
+
+  void removeOfferOperation(OfferOperation* operation);
+
+  OfferOperation* getOfferOperation(const UUID& uuid) const;
+
+  void apply(const std::vector<ResourceConversion>& conversions);
+
   // Gauge methods.
   double _frameworks_active()
   {
@@ -667,6 +681,10 @@ private:
   process::Owned<LocalResourceProviderDaemon> localResourceProviderDaemon;
   hashmap<Option<ResourceProviderID>, UUID> resourceVersions;
 
+  // Pending operations or terminal operations that have
+  // unacknowledged status updates.
+  hashmap<UUID, OfferOperation*> offerOperations;
+
 protected:
   // Made protected for testing purposes.
   mesos::SecretGenerator* secretGenerator;

http://git-wip-us.apache.org/repos/asf/mesos/blob/c946615e/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index 4008b1c..5624b23 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -316,6 +316,123 @@ TEST_P(ResourceProviderManagerHttpApiTest, UpdateState)
 }
 
 
+TEST_P(ResourceProviderManagerHttpApiTest, UpdateOfferOperationStatus)
+{
+  const ContentType contentType = GetParam();
+
+  ResourceProviderManager manager;
+
+  Option<UUID> streamId;
+  Option<mesos::v1::ResourceProviderID> resourceProviderId;
+
+  // First, subscribe to the manager to get the ID.
+  {
+    Call call;
+    call.set_type(Call::SUBSCRIBE);
+
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+
+    mesos::v1::ResourceProviderInfo* info =
+      subscribe->mutable_resource_provider_info();
+
+    info->set_type("org.apache.mesos.rp.test");
+    info->set_name("test");
+
+    http::Request request;
+    request.method = "POST";
+    request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+    request.headers["Accept"] = stringify(contentType);
+    request.headers["Content-Type"] = stringify(contentType);
+    request.body = serialize(contentType, call);
+
+    Future<http::Response> response = manager.api(request, None());
+
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+    ASSERT_EQ(http::Response::PIPE, response->type);
+
+    ASSERT_TRUE(response->headers.contains("Mesos-Stream-Id"));
+    Try<UUID> uuid = UUID::fromString(response->headers.at("Mesos-Stream-Id"));
+
+    CHECK_SOME(uuid);
+    streamId = uuid.get();
+
+    Option<http::Pipe::Reader> reader = response->reader;
+    ASSERT_SOME(reader);
+
+    recordio::Reader<Event> responseDecoder(
+        ::recordio::Decoder<Event>(
+            lambda::bind(deserialize<Event>, contentType, lambda::_1)),
+        reader.get());
+
+    Future<Result<Event>> event = responseDecoder.read();
+    AWAIT_READY(event);
+    ASSERT_SOME(event.get());
+
+    // Check event type is subscribed and the resource provider id is set.
+    ASSERT_EQ(Event::SUBSCRIBED, event->get().type());
+
+    resourceProviderId = event->get().subscribed().provider_id();
+
+    EXPECT_FALSE(resourceProviderId->value().empty());
+  }
+
+  // Then, send an offer operation update to the manager.
+  {
+    v1::FrameworkID frameworkId;
+    frameworkId.set_value("foo");
+
+    mesos::v1::OfferOperationStatus status;
+    status.set_state(mesos::v1::OfferOperationState::OFFER_OPERATION_FINISHED);
+
+    UUID operationUUID = UUID::random();
+
+    Call call;
+    call.set_type(Call::UPDATE_OFFER_OPERATION_STATUS);
+    call.mutable_resource_provider_id()->CopyFrom(resourceProviderId.get());
+
+    Call::UpdateOfferOperationStatus* updateOfferOperationStatus =
+      call.mutable_update_offer_operation_status();
+    updateOfferOperationStatus->mutable_framework_id()->CopyFrom(frameworkId);
+    updateOfferOperationStatus->mutable_status()->CopyFrom(status);
+    updateOfferOperationStatus->set_operation_uuid(operationUUID.toBytes());
+
+    http::Request request;
+    request.method = "POST";
+    request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+    request.headers["Accept"] = stringify(contentType);
+    request.headers["Content-Type"] = stringify(contentType);
+    request.headers["Mesos-Stream-Id"] = stringify(streamId.get());
+    request.body = serialize(contentType, call);
+
+    Future<http::Response> response = manager.api(request, None());
+
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(Accepted().status, response);
+
+    // The manager will send out a message informing its subscriber
+    // about the updated offer operation.
+    Future<ResourceProviderMessage> message = manager.messages().get();
+
+    AWAIT_READY(message);
+
+    EXPECT_EQ(
+        ResourceProviderMessage::Type::UPDATE_OFFER_OPERATION_STATUS,
+        message->type);
+    EXPECT_EQ(
+        devolve(resourceProviderId.get()),
+        message->updateOfferOperationStatus->id);
+    EXPECT_EQ(
+        devolve(frameworkId),
+        message->updateOfferOperationStatus->update.framework_id());
+    EXPECT_EQ(
+        devolve(status).state(),
+        message->updateOfferOperationStatus->update.status().state());
+    EXPECT_EQ(
+        operationUUID.toBytes(),
+        message->updateOfferOperationStatus->update.operation_uuid());
+  }
+}
+
+
 // This test starts an agent and connects directly with its resource
 // provider endpoint.
 TEST_P(ResourceProviderManagerHttpApiTest, AgentEndpoint)


[3/3] mesos git commit: Added a test for resource conversion using a resource provider.

Posted by ji...@apache.org.
Added a test for resource conversion using a resource provider.

Review: https://reviews.apache.org/r/63625/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/663b893a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/663b893a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/663b893a

Branch: refs/heads/master
Commit: 663b893a0badb518d6572566a11fd015d76bf429
Parents: 560942d
Author: Jan Schlicht <ja...@mesosphere.io>
Authored: Mon Nov 13 09:29:28 2017 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Mon Nov 13 09:29:28 2017 -0800

----------------------------------------------------------------------
 src/tests/resource_provider_manager_tests.cpp | 288 ++++++++++++++++++++-
 1 file changed, 284 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/663b893a/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index 5624b23..ecfe2b4 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -15,6 +15,7 @@
 // limitations under the License.
 
 #include <string>
+#include <vector>
 
 #include <gtest/gtest.h>
 
@@ -33,6 +34,8 @@
 #include <process/gtest.hpp>
 #include <process/http.hpp>
 
+#include <process/ssl/flags.hpp>
+
 #include <stout/duration.hpp>
 #include <stout/error.hpp>
 #include <stout/gtest.hpp>
@@ -70,10 +73,12 @@ using mesos::resource_provider::Registrar;
 using mesos::resource_provider::RemoveResourceProvider;
 
 using mesos::v1::resource_provider::Call;
+using mesos::v1::resource_provider::Driver;
 using mesos::v1::resource_provider::Event;
 
 using process::Clock;
 using process::Future;
+using process::Message;
 using process::Owned;
 using process::PID;
 
@@ -83,7 +88,10 @@ using process::http::OK;
 using process::http::UnsupportedMediaType;
 
 using std::string;
+using std::vector;
 
+using testing::Eq;
+using testing::SaveArg;
 using testing::Values;
 using testing::WithParamInterface;
 
@@ -93,7 +101,66 @@ namespace tests {
 
 class ResourceProviderManagerHttpApiTest
   : public MesosTest,
-    public WithParamInterface<ContentType> {};
+    public WithParamInterface<ContentType>
+{
+public:
+  Future<Nothing> subscribe(
+      v1::MockResourceProvider* resourceProvider,
+      const mesos::v1::ResourceProviderInfo& info)
+  {
+    Call call;
+    call.set_type(Call::SUBSCRIBE);
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+    subscribe->mutable_resource_provider_info()->CopyFrom(info);
+
+    return resourceProvider->send(call);
+  }
+
+  Future<Nothing> updateState(
+      v1::MockResourceProvider* resourceProvider,
+      const mesos::v1::ResourceProviderID& resourceProviderId,
+      const v1::Resources& resources)
+  {
+    Call call;
+    call.set_type(Call::UPDATE_STATE);
+    call.mutable_resource_provider_id()->CopyFrom(resourceProviderId);
+
+    Call::UpdateState* updateState = call.mutable_update_state();
+    updateState->mutable_resources()->CopyFrom(resources);
+
+    updateState->set_resource_version_uuid(UUID::random().toBytes());
+
+    return resourceProvider->send(call);
+  }
+
+  Future<Nothing> updateOfferOperationStatus(
+      v1::MockResourceProvider* resourceProvider,
+      const mesos::v1::ResourceProviderID& resourceProviderId,
+      const mesos::v1::FrameworkID& frameworkId,
+      const UUID& operationUUID,
+      const v1::Resources& convertedResources)
+  {
+    Call call;
+    call.set_type(Call::UPDATE_OFFER_OPERATION_STATUS);
+    call.mutable_resource_provider_id()->CopyFrom(resourceProviderId);
+
+    Call::UpdateOfferOperationStatus* updateOfferOperationStatus =
+      call.mutable_update_offer_operation_status();
+
+    updateOfferOperationStatus->mutable_framework_id()->CopyFrom(frameworkId);
+
+    mesos::v1::OfferOperationStatus* status =
+      updateOfferOperationStatus->mutable_status();
+    status->mutable_converted_resources()->CopyFrom(convertedResources);
+    status->set_state(mesos::v1::OfferOperationState::OFFER_OPERATION_FINISHED);
+
+    updateOfferOperationStatus->mutable_latest_status()->CopyFrom(*status);
+
+    updateOfferOperationStatus->set_operation_uuid(operationUUID.toBytes());
+
+    return resourceProvider->send(call);
+  }
+};
 
 
 // The tests are parameterized by the content type of the request.
@@ -418,9 +485,6 @@ TEST_P(ResourceProviderManagerHttpApiTest, UpdateOfferOperationStatus)
         ResourceProviderMessage::Type::UPDATE_OFFER_OPERATION_STATUS,
         message->type);
     EXPECT_EQ(
-        devolve(resourceProviderId.get()),
-        message->updateOfferOperationStatus->id);
-    EXPECT_EQ(
         devolve(frameworkId),
         message->updateOfferOperationStatus->update.framework_id());
     EXPECT_EQ(
@@ -580,6 +644,222 @@ TEST_F(ResourceProviderRegistrarTest, MasterRegistrar)
       new RemoveResourceProvider(resourceProviderId))));
 }
 
+
+// Test that resource provider resources are offered to frameworks,
+// frameworks can accept the offer with an operation that has a resource
+// provider convert resources and that the converted resources are
+// offered to frameworks as well.
+TEST_P(ResourceProviderManagerHttpApiTest, ConvertResources)
+{
+  // Start master and agent.
+  master::Flags masterFlags = CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  Future<UpdateSlaveMessage> updateSlaveMessage =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  slaveFlags.authenticate_http_readwrite = false;
+
+  // Set the resource provider capability and other required capabilities.
+  constexpr SlaveInfo::Capability::Type capabilities[] = {
+    SlaveInfo::Capability::MULTI_ROLE,
+    SlaveInfo::Capability::HIERARCHICAL_ROLE,
+    SlaveInfo::Capability::RESERVATION_REFINEMENT,
+    SlaveInfo::Capability::RESOURCE_PROVIDER};
+
+  slaveFlags.agent_features = SlaveCapabilities();
+  foreach (SlaveInfo::Capability::Type type, capabilities) {
+    SlaveInfo::Capability* capability =
+      slaveFlags.agent_features->add_capabilities();
+    capability->set_type(type);
+  }
+
+  Try<Owned<cluster::Slave>> agent = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(agent);
+
+  AWAIT_READY(updateSlaveMessage);
+
+  v1::MockResourceProvider resourceProvider;
+
+  // Start and register a resource provider.
+  {
+    Future<Nothing> connected;
+    EXPECT_CALL(resourceProvider, connected())
+      .WillOnce(FutureSatisfy(&connected));
+
+    string scheme = "http";
+
+#ifdef USE_SSL_SOCKET
+    if (process::network::openssl::flags().enabled) {
+      scheme = "https";
+    }
+#endif
+
+    http::URL url(
+        scheme,
+        agent.get()->pid.address.ip,
+        agent.get()->pid.address.port,
+        agent.get()->pid.id + "/api/v1/resource_provider");
+
+    Owned<EndpointDetector> endpointDetector(new ConstantEndpointDetector(url));
+
+    const ContentType contentType = GetParam();
+
+    resourceProvider.start(
+        endpointDetector, contentType, v1::DEFAULT_CREDENTIAL);
+
+    AWAIT_READY(connected);
+  }
+
+  Option<mesos::v1::ResourceProviderID> resourceProviderId;
+
+  {
+    Future<Event::Subscribed> subscribed;
+    EXPECT_CALL(resourceProvider, subscribed(_))
+      .WillOnce(FutureArg<0>(&subscribed));
+
+    mesos::v1::ResourceProviderInfo resourceProviderInfo;
+    resourceProviderInfo.set_type("org.apache.mesos.rp.test");
+    resourceProviderInfo.set_name("test");
+
+    AWAIT_READY(subscribe(&resourceProvider, resourceProviderInfo));
+
+    AWAIT_READY(subscribed);
+
+    resourceProviderId = subscribed->provider_id();
+
+    ASSERT_FALSE(resourceProviderId->value().empty());
+  }
+
+  v1::Resource disk = v1::createDiskResource(
+      "200",
+      "*",
+      None(),
+      None(),
+      v1::createDiskSourceRaw());
+  disk.mutable_provider_id()->CopyFrom(resourceProviderId.get());
+
+  updateSlaveMessage = FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  AWAIT_READY(updateState(&resourceProvider, resourceProviderId.get(), disk));
+
+  // Wait until the agent's resources have been updated to include the
+  // resource provider resources.
+  AWAIT_READY(updateSlaveMessage);
+
+  // Start and register a framework.
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+  Future<Nothing> connected;
+  EXPECT_CALL(*scheduler, connected(_)).WillOnce(FutureSatisfy(&connected));
+
+  v1::scheduler::TestMesos mesos(
+      master.get()->pid,
+      ContentType::PROTOBUF,
+      scheduler);
+
+  AWAIT_READY(connected);
+
+  Option<v1::FrameworkID> frameworkId;
+  Option<mesos::v1::Offer> offer;
+
+  {
+    Future<v1::scheduler::Event::Subscribed> subscribed;
+    EXPECT_CALL(*scheduler, subscribed(_, _))
+      .WillOnce(FutureArg<1>(&subscribed));
+
+    Future<v1::scheduler::Event::Offers> offers;
+    EXPECT_CALL(*scheduler, offers(_, _))
+      .WillOnce(FutureArg<1>(&offers))
+      .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+    EXPECT_CALL(*scheduler, heartbeat(_))
+      .WillRepeatedly(Return()); // Ignore heartbeats.
+
+    mesos::v1::scheduler::Call call;
+    call.set_type(mesos::v1::scheduler::Call::SUBSCRIBE);
+    mesos::v1::scheduler::Call::Subscribe* subscribe = call.mutable_subscribe();
+    subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO);
+
+    mesos.send(call);
+
+    AWAIT_READY(subscribed);
+
+    frameworkId = subscribed->framework_id();
+
+    // Resource provider resources will be offered to the framework.
+    AWAIT_READY(offers);
+    EXPECT_FALSE(offers->offers().empty());
+    offer = offers->offers(0);
+  }
+
+  Future<Event::Operation> operation;
+
+  // Accept the offer with a 'CREATE_BLOCK' operation.
+  EXPECT_CALL(resourceProvider, operation(_))
+    .WillOnce(FutureArg<0>(&operation));
+
+  mesos.send(v1::createCallAccept(
+      frameworkId.get(),
+      offer.get(),
+      {v1::CREATE_BLOCK(disk)}));
+
+  AWAIT_READY(operation);
+
+  // Have the resource provider handle the operation, converting its resource.
+  {
+    ASSERT_TRUE(operation->info().has_create_block());
+
+    mesos::v1::Resource convertedResource;
+    convertedResource.CopyFrom(operation->info().create_block().source());
+    convertedResource.mutable_disk()->mutable_source()->set_type(
+        mesos::v1::Resource::DiskInfo::Source::BLOCK);
+
+    Try<UUID> operationUUID = UUID::fromBytes(operation->operation_uuid());
+    CHECK_SOME(operationUUID);
+
+    Clock::pause();
+
+    Future<v1::scheduler::Event::Offers> offers;
+    EXPECT_CALL(*scheduler, offers(_, _))
+      .WillOnce(FutureArg<1>(&offers))
+      .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+    AWAIT_READY(updateOfferOperationStatus(
+        &resourceProvider,
+        resourceProviderId.get(),
+        operation->framework_id(),
+        operationUUID.get(),
+        convertedResource));
+
+    Clock::advance(masterFlags.allocation_interval);
+    Clock::settle();
+    Clock::resume();
+
+    // The converted resource should be offered to the framework.
+    AWAIT_READY(offers);
+    EXPECT_FALSE(offers->offers().empty());
+
+    const v1::Offer& offer = offers->offers(0);
+
+    Option<v1::Resource> block;
+    foreach (const v1::Resource& resource, offer.resources()) {
+      if (resource.has_provider_id()) {
+        block = resource;
+      }
+    }
+
+    ASSERT_SOME(block);
+    EXPECT_EQ(
+        v1::Resource::DiskInfo::Source::BLOCK,
+        block->disk().source().type());
+  }
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {


[2/3] mesos git commit: Added test helpers for storage operations.

Posted by ji...@apache.org.
Added test helpers for storage operations.

Review: https://reviews.apache.org/r/63679/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/560942dd
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/560942dd
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/560942dd

Branch: refs/heads/master
Commit: 560942ddab37dc341c4889c158149de3ab987d01
Parents: c946615
Author: Jan Schlicht <ja...@mesosphere.io>
Authored: Mon Nov 13 09:29:23 2017 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Mon Nov 13 09:29:23 2017 -0800

----------------------------------------------------------------------
 src/tests/mesos.hpp | 107 +++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 107 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/560942dd/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index e25defe..4e2513f 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -1279,6 +1279,49 @@ inline typename TOffer::Operation LAUNCH_GROUP(
 }
 
 
+template <typename TResource, typename TTargetType, typename TOffer>
+inline typename TOffer::Operation CREATE_VOLUME(
+    const TResource& source,
+    const TTargetType& type)
+{
+  typename TOffer::Operation operation;
+  operation.set_type(TOffer::Operation::CREATE_VOLUME);
+  operation.mutable_create_volume()->mutable_source()->CopyFrom(source);
+  operation.set_target_type(type);
+  return operation;
+}
+
+
+template <typename TResource, typename TOffer>
+inline typename TOffer::Operation DESTROY_VOLUME(const TResource& volume)
+{
+  typename TOffer::Operation operation;
+  operation.set_type(TOffer::Operation::DESTROY_VOLUME);
+  operation.mutable_destroy_volume()->mutable_volume()->CopyFrom(volume);
+  return operation;
+}
+
+
+template <typename TResource, typename TOffer>
+inline typename TOffer::Operation CREATE_BLOCK(const TResource& source)
+{
+  typename TOffer::Operation operation;
+  operation.set_type(TOffer::Operation::CREATE_BLOCK);
+  operation.mutable_create_block()->mutable_source()->CopyFrom(source);
+  return operation;
+}
+
+
+template <typename TResource, typename TOffer>
+inline typename TOffer::Operation DESTROY_BLOCK(const TResource& block)
+{
+  typename TOffer::Operation operation;
+  operation.set_type(TOffer::Operation::DESTROY_BLOCK);
+  operation.mutable_destroy_block()->mutable_block()->CopyFrom(block);
+  return operation;
+}
+
+
 template <typename TParameters, typename TParameter>
 inline TParameters parameterize(const ACLs& acls)
 {
@@ -1595,6 +1638,36 @@ inline Offer::Operation LAUNCH_GROUP(Args&&... args)
 
 
 template <typename... Args>
+inline Offer::Operation CREATE_VOLUME(Args&&... args)
+{
+  return common::CREATE_VOLUME<Resource,
+                               Resource::DiskInfo::Source::Type,
+                               Offer>(std::forward<Args>(args)...);
+}
+
+
+template <typename... Args>
+inline Offer::Operation DESTROY_VOLUME(Args&&... args)
+{
+  return common::DESTROY_VOLUME<Resource, Offer>(std::forward<Args>(args)...);
+}
+
+
+template <typename... Args>
+inline Offer::Operation CREATE_BLOCK(Args&&... args)
+{
+  return common::CREATE_BLOCK<Resource, Offer>(std::forward<Args>(args)...);
+}
+
+
+template <typename... Args>
+inline Offer::Operation DESTROY_BLOCK(Args&&... args)
+{
+  return common::DESTROY_BLOCK<Resource, Offer>(std::forward<Args>(args)...);
+}
+
+
+template <typename... Args>
 inline Parameters parameterize(Args&&... args)
 {
   return common::parameterize<Parameters, Parameter>(
@@ -1865,6 +1938,40 @@ inline mesos::v1::Offer::Operation LAUNCH_GROUP(Args&&... args)
 
 
 template <typename... Args>
+inline mesos::v1::Offer::Operation CREATE_VOLUME(Args&&... args)
+{
+  return common::CREATE_VOLUME<mesos::v1::Resource,
+                               mesos::v1::Resource::DiskInfo::Source::Type,
+                               mesos::v1::Offer>(
+      std::forward<Args>(args)...);
+}
+
+
+template <typename... Args>
+inline mesos::v1::Offer::Operation DESTROY_VOLUME(Args&&... args)
+{
+  return common::DESTROY_VOLUME<mesos::v1::Resource, mesos::v1::Offer>(
+      std::forward<Args>(args)...);
+}
+
+
+template <typename... Args>
+inline mesos::v1::Offer::Operation CREATE_BLOCK(Args&&... args)
+{
+  return common::CREATE_BLOCK<mesos::v1::Resource, mesos::v1::Offer>(
+      std::forward<Args>(args)...);
+}
+
+
+template <typename... Args>
+inline mesos::v1::Offer::Operation DESTROY_BLOCK(Args&&... args)
+{
+  return common::DESTROY_BLOCK<mesos::v1::Resource, mesos::v1::Offer>(
+      std::forward<Args>(args)...);
+}
+
+
+template <typename... Args>
 inline mesos::v1::Parameters parameterize(Args&&... args)
 {
   return common::parameterize<mesos::v1::Parameters, mesos::v1::Parameter>(