You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2017/11/13 18:11:28 UTC
[1/3] mesos git commit: Provided handling for offer operation updates.
Repository: mesos
Updated Branches:
refs/heads/master cf543bafc -> 663b893a0
Provided handling for offer operation updates.
When a resource provider has finished an offer operation, it will send
a status update to the resource provider manager and subsequently to an
agent. The agent then updates its internal bookkeeping and forwards the
status update to the master.
Review: https://reviews.apache.org/r/63622/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c946615e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c946615e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c946615e
Branch: refs/heads/master
Commit: c946615ec6d0deb26aae984d99097ae6bc02058d
Parents: cf543ba
Author: Jan Schlicht <ja...@mesosphere.io>
Authored: Mon Nov 13 09:29:15 2017 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Mon Nov 13 09:29:15 2017 -0800
----------------------------------------------------------------------
src/common/protobuf_utils.cpp | 9 +-
src/common/protobuf_utils.hpp | 3 +-
src/resource_provider/manager.cpp | 14 +-
src/resource_provider/message.hpp | 28 ++-
src/slave/slave.cpp | 223 +++++++++++++++++++++
src/slave/slave.hpp | 18 ++
src/tests/resource_provider_manager_tests.cpp | 117 +++++++++++
7 files changed, 406 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/c946615e/src/common/protobuf_utils.cpp
----------------------------------------------------------------------
diff --git a/src/common/protobuf_utils.cpp b/src/common/protobuf_utils.cpp
index 5739a63..3405484 100644
--- a/src/common/protobuf_utils.cpp
+++ b/src/common/protobuf_utils.cpp
@@ -444,14 +444,19 @@ OfferOperation createOfferOperation(
const Offer::Operation& info,
const OfferOperationStatus& latestStatus,
const FrameworkID& frameworkId,
- const SlaveID& slaveId)
+ const SlaveID& slaveId,
+ const Option<UUID>& operationUUID)
{
OfferOperation operation;
operation.mutable_framework_id()->CopyFrom(frameworkId);
operation.mutable_slave_id()->CopyFrom(slaveId);
operation.mutable_info()->CopyFrom(info);
operation.mutable_latest_status()->CopyFrom(latestStatus);
- operation.set_operation_uuid(UUID::random().toBytes());
+ if (operationUUID.isSome()) {
+ operation.set_operation_uuid(operationUUID->toBytes());
+ } else {
+ operation.set_operation_uuid(UUID::random().toBytes());
+ }
return operation;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/c946615e/src/common/protobuf_utils.hpp
----------------------------------------------------------------------
diff --git a/src/common/protobuf_utils.hpp b/src/common/protobuf_utils.hpp
index 0ca4c6d..b2aa365 100644
--- a/src/common/protobuf_utils.hpp
+++ b/src/common/protobuf_utils.hpp
@@ -164,7 +164,8 @@ OfferOperation createOfferOperation(
const Offer::Operation& info,
const OfferOperationStatus& latestStatus,
const FrameworkID& frameworkId,
- const SlaveID& slaveId);
+ const SlaveID& slaveId,
+ const Option<UUID>& operationUUID = None());
// Helper function that creates a MasterInfo from UPID.
http://git-wip-us.apache.org/repos/asf/mesos/blob/c946615e/src/resource_provider/manager.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp
index bcc833b..6dfc429 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -415,7 +415,19 @@ void ResourceProviderManagerProcess::updateOfferOperationStatus(
ResourceProvider* resourceProvider,
const Call::UpdateOfferOperationStatus& update)
{
- // TODO(nfnt): Implement the 'UPDATE_OFFER_OPERATION_STATUS' call handler.
+ ResourceProviderMessage::UpdateOfferOperationStatus body;
+ body.update.mutable_framework_id()->CopyFrom(update.framework_id());
+ body.update.mutable_status()->CopyFrom(update.status());
+ body.update.set_operation_uuid(update.operation_uuid());
+ if (update.has_latest_status()) {
+ body.update.mutable_latest_status()->CopyFrom(update.latest_status());
+ }
+
+ ResourceProviderMessage message;
+ message.type = ResourceProviderMessage::Type::UPDATE_OFFER_OPERATION_STATUS;
+ message.updateOfferOperationStatus = std::move(body);
+
+ messages.put(std::move(message));
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/c946615e/src/resource_provider/message.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/message.hpp b/src/resource_provider/message.hpp
index a1a84c1..05879cd 100644
--- a/src/resource_provider/message.hpp
+++ b/src/resource_provider/message.hpp
@@ -23,9 +23,13 @@
#include <mesos/resources.hpp>
#include <stout/check.hpp>
+#include <stout/jsonify.hpp>
#include <stout/option.hpp>
+#include <stout/protobuf.hpp>
#include <stout/unreachable.hpp>
+#include "messages/messages.hpp"
+
namespace mesos {
namespace internal {
@@ -33,7 +37,8 @@ struct ResourceProviderMessage
{
enum class Type
{
- UPDATE_TOTAL_RESOURCES
+ UPDATE_TOTAL_RESOURCES,
+ UPDATE_OFFER_OPERATION_STATUS
};
struct UpdateTotalResources
@@ -43,9 +48,15 @@ struct ResourceProviderMessage
Resources total;
};
+ struct UpdateOfferOperationStatus
+ {
+ OfferOperationStatusUpdate update;
+ };
+
Type type;
Option<UpdateTotalResources> updateTotalResources;
+ Option<UpdateOfferOperationStatus> updateOfferOperationStatus;
};
@@ -54,7 +65,7 @@ inline std::ostream& operator<<(
const ResourceProviderMessage& resourceProviderMessage)
{
switch (resourceProviderMessage.type) {
- case ResourceProviderMessage::Type::UPDATE_TOTAL_RESOURCES:
+ case ResourceProviderMessage::Type::UPDATE_TOTAL_RESOURCES: {
const Option<ResourceProviderMessage::UpdateTotalResources>&
updateTotalResources = resourceProviderMessage.updateTotalResources;
@@ -64,6 +75,19 @@ inline std::ostream& operator<<(
<< "UPDATE_TOTAL_RESOURCES: "
<< updateTotalResources->id << " "
<< updateTotalResources->total;
+ }
+
+ case ResourceProviderMessage::Type::UPDATE_OFFER_OPERATION_STATUS: {
+ const Option<ResourceProviderMessage::UpdateOfferOperationStatus>&
+ updateOfferOperationStatus =
+ resourceProviderMessage.updateOfferOperationStatus;
+
+ CHECK_SOME(updateOfferOperationStatus);
+
+ return stream
+ << "UPDATE_OFFER_OPERATION_STATUS: "
+ << jsonify(JSON::Protobuf(updateOfferOperationStatus->update));
+ }
}
UNREACHABLE();
http://git-wip-us.apache.org/repos/asf/mesos/blob/c946615e/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 7cb6661..d8baceb 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -3703,6 +3703,16 @@ void Slave::applyOfferOperation(const ApplyOfferOperationMessage& message)
return;
}
+ OfferOperation* offerOperation = new OfferOperation(
+ protobuf::createOfferOperation(
+ message.operation_info(),
+ protobuf::createOfferOperationStatus(OFFER_OPERATION_PENDING),
+ message.framework_id(),
+ info.id(),
+ uuid.get()));
+
+ addOfferOperation(offerOperation);
+
if (resourceProviderId.isSome()) {
resourceProviderManager.applyOfferOperation(message);
return;
@@ -6772,6 +6782,59 @@ void Slave::handleResourceProviderMessage(
}
break;
}
+ case ResourceProviderMessage::Type::UPDATE_OFFER_OPERATION_STATUS: {
+ CHECK_SOME(message->updateOfferOperationStatus);
+
+ const OfferOperationStatusUpdate& update =
+ message->updateOfferOperationStatus->update;
+
+ Try<UUID> operationUUID = UUID::fromBytes(update.operation_uuid());
+ CHECK_SOME(operationUUID);
+
+ OfferOperation* operation = getOfferOperation(operationUUID.get());
+ if (operation == nullptr) {
+ LOG(WARNING) << "Failed to find the offer operation '"
+ << update.status().operation_id() << "' (uuid: "
+ << operationUUID->toString() << ") for framework "
+ << update.framework_id();
+ return;
+ }
+
+ updateOfferOperation(operation, update);
+
+ switch (state) {
+ case RECOVERING:
+ case DISCONNECTED:
+ case TERMINATING: {
+ LOG(WARNING) << "Dropping status update of offer operation '"
+ << update.status().operation_id() << "' (uuid: "
+ << operationUUID->toString() << ") for framework "
+ << update.framework_id() << " because agent is in "
+ << state << " state";
+ break;
+ }
+ case RUNNING: {
+ LOG(INFO) << "Forwarding status update of offer operation '"
+ << update.status().operation_id()
+ << "' (uuid: " << operationUUID->toString()
+ << ") for framework " << update.framework_id();
+
+ // The status update from the resource provider didn't
+ // provide the agent ID (because the resource provider doesn't
+ // know it), hence we inject it here.
+ OfferOperationStatusUpdate _update;
+ _update.CopyFrom(update);
+ _update.mutable_slave_id()->CopyFrom(info.id());
+
+ send(master.get(), _update);
+ break;
+ }
+ }
+
+ if (protobuf::isTerminalState(operation->latest_status().state())) {
+ removeOfferOperation(operation);
+ }
+ }
}
// Wait for the next message.
@@ -6780,6 +6843,166 @@ void Slave::handleResourceProviderMessage(
}
+void Slave::addOfferOperation(OfferOperation* operation)
+{
+ Try<UUID> uuid = UUID::fromBytes(operation->operation_uuid());
+ CHECK_SOME(uuid);
+
+ offerOperations.put(uuid.get(), operation);
+}
+
+
+void Slave::updateOfferOperation(
+ OfferOperation* operation,
+ const OfferOperationStatusUpdate& update)
+{
+ CHECK_NOTNULL(operation);
+
+ const OfferOperationStatus& status = update.status();
+
+ Option<OfferOperationStatus> latestStatus;
+ if (update.has_latest_status()) {
+ latestStatus = update.latest_status();
+ }
+
+ // Whether the offer operation has just become terminated.
+ Option<bool> terminated;
+
+ if (latestStatus.isSome()) {
+ terminated =
+ !protobuf::isTerminalState(operation->latest_status().state()) &&
+ protobuf::isTerminalState(latestStatus->state());
+
+ // If the operation has already transitioned to a terminal state,
+ // do not update its state.
+ if (!protobuf::isTerminalState(operation->latest_status().state())) {
+ operation->mutable_latest_status()->CopyFrom(latestStatus.get());
+ }
+ } else {
+ terminated =
+ !protobuf::isTerminalState(operation->latest_status().state()) &&
+ protobuf::isTerminalState(status.state());
+
+ if (!protobuf::isTerminalState(operation->latest_status().state())) {
+ operation->mutable_latest_status()->CopyFrom(status);
+ }
+ }
+
+ operation->add_statuses()->CopyFrom(status);
+
+ LOG(INFO) << "Updating the state of offer operation '"
+ << operation->info().id()
+ << "' (uuid: " << operation->operation_uuid()
+ << ") of framework " << operation->framework_id()
+ << " (latest state: " << operation->latest_status().state()
+ << ", status update state: " << status.state() << ")";
+
+ CHECK_SOME(terminated);
+
+ if (!terminated.get()) {
+ return;
+ }
+
+ Resource consumed;
+ switch (operation->info().type()) {
+ case Offer::Operation::LAUNCH:
+ LOG(FATAL) << "Unexpected LAUNCH operation";
+ break;
+ case Offer::Operation::LAUNCH_GROUP:
+ LOG(FATAL) << "Unexpected LAUNCH_GROUP operation";
+ break;
+ case Offer::Operation::RESERVE:
+ case Offer::Operation::UNRESERVE:
+ case Offer::Operation::CREATE:
+ case Offer::Operation::DESTROY:
+ return;
+ case Offer::Operation::CREATE_VOLUME:
+ consumed = operation->info().create_volume().source();
+ break;
+ case Offer::Operation::DESTROY_VOLUME:
+ consumed = operation->info().destroy_volume().volume();
+ break;
+ case Offer::Operation::CREATE_BLOCK:
+ consumed = operation->info().create_block().source();
+ break;
+ case Offer::Operation::DESTROY_BLOCK:
+ consumed = operation->info().destroy_block().block();
+ break;
+ case Offer::Operation::UNKNOWN:
+ LOG(WARNING) << "Unknown offer operation";
+ return;
+ }
+
+ switch (update.latest_status().state()) {
+ // Terminal state, and the conversion is successful.
+ case OFFER_OPERATION_FINISHED: {
+ // 'totalResources' don't have allocations set, we need
+ // to remove them from the consumed and converted resources.
+ if (consumed.has_allocation_info()) {
+ consumed.clear_allocation_info();
+ }
+
+ Resources converted =
+ update.latest_status().converted_resources();
+ converted.unallocate();
+
+ ResourceConversion conversion(consumed, converted);
+
+ apply({conversion});
+
+ break;
+ }
+
+ // Terminal state, and the conversion has failed.
+ case OFFER_OPERATION_FAILED:
+ case OFFER_OPERATION_ERROR: {
+ break;
+ }
+
+ // Non-terminal. This shouldn't happen.
+ case OFFER_OPERATION_PENDING:
+ case OFFER_OPERATION_UNSUPPORTED: {
+ LOG(FATAL) << "Unexpected offer operation state "
+ << operation->latest_status().state();
+ }
+ }
+}
+
+
+void Slave::removeOfferOperation(OfferOperation* operation)
+{
+ Try<UUID> uuid = UUID::fromBytes(operation->operation_uuid());
+ CHECK_SOME(uuid);
+
+ CHECK(offerOperations.contains(uuid.get()))
+ << "Unknown offer operation (uuid: " << uuid->toString() << ")";
+
+ CHECK(protobuf::isTerminalState(operation->latest_status().state()))
+ << operation->latest_status().state();
+
+ offerOperations.erase(uuid.get());
+ delete operation;
+}
+
+
+OfferOperation* Slave::getOfferOperation(const UUID& uuid) const
+{
+ if (offerOperations.contains(uuid)) {
+ return offerOperations.at(uuid);
+ }
+ return nullptr;
+}
+
+
+void Slave::apply(const vector<ResourceConversion>& conversions)
+{
+ Try<Resources> resources = totalResources.apply(conversions);
+ CHECK_SOME(resources);
+
+ totalResources = resources.get();
+}
+
+
void Slave::qosCorrections()
{
qosController->corrections()
http://git-wip-us.apache.org/repos/asf/mesos/blob/c946615e/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 0124df4..c0acaa6 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -533,6 +533,20 @@ private:
void handleResourceProviderMessage(
const process::Future<ResourceProviderMessage>& message);
+ void addOfferOperation(OfferOperation* operation);
+
+ // Transitions the offer operation, and recovers resource if the
+ // offer operation becomes terminal.
+ void updateOfferOperation(
+ OfferOperation* operation,
+ const OfferOperationStatusUpdate& update);
+
+ void removeOfferOperation(OfferOperation* operation);
+
+ OfferOperation* getOfferOperation(const UUID& uuid) const;
+
+ void apply(const std::vector<ResourceConversion>& conversions);
+
// Gauge methods.
double _frameworks_active()
{
@@ -667,6 +681,10 @@ private:
process::Owned<LocalResourceProviderDaemon> localResourceProviderDaemon;
hashmap<Option<ResourceProviderID>, UUID> resourceVersions;
+ // Pending operations or terminal operations that have
+ // unacknowledged status updates.
+ hashmap<UUID, OfferOperation*> offerOperations;
+
protected:
// Made protected for testing purposes.
mesos::SecretGenerator* secretGenerator;
http://git-wip-us.apache.org/repos/asf/mesos/blob/c946615e/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index 4008b1c..5624b23 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -316,6 +316,123 @@ TEST_P(ResourceProviderManagerHttpApiTest, UpdateState)
}
+TEST_P(ResourceProviderManagerHttpApiTest, UpdateOfferOperationStatus)
+{
+ const ContentType contentType = GetParam();
+
+ ResourceProviderManager manager;
+
+ Option<UUID> streamId;
+ Option<mesos::v1::ResourceProviderID> resourceProviderId;
+
+ // First, subscribe to the manager to get the ID.
+ {
+ Call call;
+ call.set_type(Call::SUBSCRIBE);
+
+ Call::Subscribe* subscribe = call.mutable_subscribe();
+
+ mesos::v1::ResourceProviderInfo* info =
+ subscribe->mutable_resource_provider_info();
+
+ info->set_type("org.apache.mesos.rp.test");
+ info->set_name("test");
+
+ http::Request request;
+ request.method = "POST";
+ request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+ request.headers["Accept"] = stringify(contentType);
+ request.headers["Content-Type"] = stringify(contentType);
+ request.body = serialize(contentType, call);
+
+ Future<http::Response> response = manager.api(request, None());
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+ ASSERT_EQ(http::Response::PIPE, response->type);
+
+ ASSERT_TRUE(response->headers.contains("Mesos-Stream-Id"));
+ Try<UUID> uuid = UUID::fromString(response->headers.at("Mesos-Stream-Id"));
+
+ CHECK_SOME(uuid);
+ streamId = uuid.get();
+
+ Option<http::Pipe::Reader> reader = response->reader;
+ ASSERT_SOME(reader);
+
+ recordio::Reader<Event> responseDecoder(
+ ::recordio::Decoder<Event>(
+ lambda::bind(deserialize<Event>, contentType, lambda::_1)),
+ reader.get());
+
+ Future<Result<Event>> event = responseDecoder.read();
+ AWAIT_READY(event);
+ ASSERT_SOME(event.get());
+
+ // Check event type is subscribed and the resource provider id is set.
+ ASSERT_EQ(Event::SUBSCRIBED, event->get().type());
+
+ resourceProviderId = event->get().subscribed().provider_id();
+
+ EXPECT_FALSE(resourceProviderId->value().empty());
+ }
+
+ // Then, send an offer operation update to the manager.
+ {
+ v1::FrameworkID frameworkId;
+ frameworkId.set_value("foo");
+
+ mesos::v1::OfferOperationStatus status;
+ status.set_state(mesos::v1::OfferOperationState::OFFER_OPERATION_FINISHED);
+
+ UUID operationUUID = UUID::random();
+
+ Call call;
+ call.set_type(Call::UPDATE_OFFER_OPERATION_STATUS);
+ call.mutable_resource_provider_id()->CopyFrom(resourceProviderId.get());
+
+ Call::UpdateOfferOperationStatus* updateOfferOperationStatus =
+ call.mutable_update_offer_operation_status();
+ updateOfferOperationStatus->mutable_framework_id()->CopyFrom(frameworkId);
+ updateOfferOperationStatus->mutable_status()->CopyFrom(status);
+ updateOfferOperationStatus->set_operation_uuid(operationUUID.toBytes());
+
+ http::Request request;
+ request.method = "POST";
+ request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+ request.headers["Accept"] = stringify(contentType);
+ request.headers["Content-Type"] = stringify(contentType);
+ request.headers["Mesos-Stream-Id"] = stringify(streamId.get());
+ request.body = serialize(contentType, call);
+
+ Future<http::Response> response = manager.api(request, None());
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(Accepted().status, response);
+
+ // The manager will send out a message informing its subscriber
+ // about the updated offer operation.
+ Future<ResourceProviderMessage> message = manager.messages().get();
+
+ AWAIT_READY(message);
+
+ EXPECT_EQ(
+ ResourceProviderMessage::Type::UPDATE_OFFER_OPERATION_STATUS,
+ message->type);
+ EXPECT_EQ(
+ devolve(resourceProviderId.get()),
+ message->updateOfferOperationStatus->id);
+ EXPECT_EQ(
+ devolve(frameworkId),
+ message->updateOfferOperationStatus->update.framework_id());
+ EXPECT_EQ(
+ devolve(status).state(),
+ message->updateOfferOperationStatus->update.status().state());
+ EXPECT_EQ(
+ operationUUID.toBytes(),
+ message->updateOfferOperationStatus->update.operation_uuid());
+ }
+}
+
+
// This test starts an agent and connects directly with its resource
// provider endpoint.
TEST_P(ResourceProviderManagerHttpApiTest, AgentEndpoint)
[3/3] mesos git commit: Added a test for resource conversion using a
resource provider.
Posted by ji...@apache.org.
Added a test for resource conversion using a resource provider.
Review: https://reviews.apache.org/r/63625/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/663b893a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/663b893a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/663b893a
Branch: refs/heads/master
Commit: 663b893a0badb518d6572566a11fd015d76bf429
Parents: 560942d
Author: Jan Schlicht <ja...@mesosphere.io>
Authored: Mon Nov 13 09:29:28 2017 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Mon Nov 13 09:29:28 2017 -0800
----------------------------------------------------------------------
src/tests/resource_provider_manager_tests.cpp | 288 ++++++++++++++++++++-
1 file changed, 284 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/663b893a/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index 5624b23..ecfe2b4 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -15,6 +15,7 @@
// limitations under the License.
#include <string>
+#include <vector>
#include <gtest/gtest.h>
@@ -33,6 +34,8 @@
#include <process/gtest.hpp>
#include <process/http.hpp>
+#include <process/ssl/flags.hpp>
+
#include <stout/duration.hpp>
#include <stout/error.hpp>
#include <stout/gtest.hpp>
@@ -70,10 +73,12 @@ using mesos::resource_provider::Registrar;
using mesos::resource_provider::RemoveResourceProvider;
using mesos::v1::resource_provider::Call;
+using mesos::v1::resource_provider::Driver;
using mesos::v1::resource_provider::Event;
using process::Clock;
using process::Future;
+using process::Message;
using process::Owned;
using process::PID;
@@ -83,7 +88,10 @@ using process::http::OK;
using process::http::UnsupportedMediaType;
using std::string;
+using std::vector;
+using testing::Eq;
+using testing::SaveArg;
using testing::Values;
using testing::WithParamInterface;
@@ -93,7 +101,66 @@ namespace tests {
class ResourceProviderManagerHttpApiTest
: public MesosTest,
- public WithParamInterface<ContentType> {};
+ public WithParamInterface<ContentType>
+{
+public:
+ Future<Nothing> subscribe(
+ v1::MockResourceProvider* resourceProvider,
+ const mesos::v1::ResourceProviderInfo& info)
+ {
+ Call call;
+ call.set_type(Call::SUBSCRIBE);
+ Call::Subscribe* subscribe = call.mutable_subscribe();
+ subscribe->mutable_resource_provider_info()->CopyFrom(info);
+
+ return resourceProvider->send(call);
+ }
+
+ Future<Nothing> updateState(
+ v1::MockResourceProvider* resourceProvider,
+ const mesos::v1::ResourceProviderID& resourceProviderId,
+ const v1::Resources& resources)
+ {
+ Call call;
+ call.set_type(Call::UPDATE_STATE);
+ call.mutable_resource_provider_id()->CopyFrom(resourceProviderId);
+
+ Call::UpdateState* updateState = call.mutable_update_state();
+ updateState->mutable_resources()->CopyFrom(resources);
+
+ updateState->set_resource_version_uuid(UUID::random().toBytes());
+
+ return resourceProvider->send(call);
+ }
+
+ Future<Nothing> updateOfferOperationStatus(
+ v1::MockResourceProvider* resourceProvider,
+ const mesos::v1::ResourceProviderID& resourceProviderId,
+ const mesos::v1::FrameworkID& frameworkId,
+ const UUID& operationUUID,
+ const v1::Resources& convertedResources)
+ {
+ Call call;
+ call.set_type(Call::UPDATE_OFFER_OPERATION_STATUS);
+ call.mutable_resource_provider_id()->CopyFrom(resourceProviderId);
+
+ Call::UpdateOfferOperationStatus* updateOfferOperationStatus =
+ call.mutable_update_offer_operation_status();
+
+ updateOfferOperationStatus->mutable_framework_id()->CopyFrom(frameworkId);
+
+ mesos::v1::OfferOperationStatus* status =
+ updateOfferOperationStatus->mutable_status();
+ status->mutable_converted_resources()->CopyFrom(convertedResources);
+ status->set_state(mesos::v1::OfferOperationState::OFFER_OPERATION_FINISHED);
+
+ updateOfferOperationStatus->mutable_latest_status()->CopyFrom(*status);
+
+ updateOfferOperationStatus->set_operation_uuid(operationUUID.toBytes());
+
+ return resourceProvider->send(call);
+ }
+};
// The tests are parameterized by the content type of the request.
@@ -418,9 +485,6 @@ TEST_P(ResourceProviderManagerHttpApiTest, UpdateOfferOperationStatus)
ResourceProviderMessage::Type::UPDATE_OFFER_OPERATION_STATUS,
message->type);
EXPECT_EQ(
- devolve(resourceProviderId.get()),
- message->updateOfferOperationStatus->id);
- EXPECT_EQ(
devolve(frameworkId),
message->updateOfferOperationStatus->update.framework_id());
EXPECT_EQ(
@@ -580,6 +644,222 @@ TEST_F(ResourceProviderRegistrarTest, MasterRegistrar)
new RemoveResourceProvider(resourceProviderId))));
}
+
+// Test that resource provider resources are offered to frameworks,
+// frameworks can accept the offer with an operation that has a resource
+// provider convert resources and that the converted resources are
+// offered to frameworks as well.
+TEST_P(ResourceProviderManagerHttpApiTest, ConvertResources)
+{
+ // Start master and agent.
+ master::Flags masterFlags = CreateMasterFlags();
+ Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+ ASSERT_SOME(master);
+
+ Future<UpdateSlaveMessage> updateSlaveMessage =
+ FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+
+ slave::Flags slaveFlags = CreateSlaveFlags();
+ slaveFlags.authenticate_http_readwrite = false;
+
+ // Set the resource provider capability and other required capabilities.
+ constexpr SlaveInfo::Capability::Type capabilities[] = {
+ SlaveInfo::Capability::MULTI_ROLE,
+ SlaveInfo::Capability::HIERARCHICAL_ROLE,
+ SlaveInfo::Capability::RESERVATION_REFINEMENT,
+ SlaveInfo::Capability::RESOURCE_PROVIDER};
+
+ slaveFlags.agent_features = SlaveCapabilities();
+ foreach (SlaveInfo::Capability::Type type, capabilities) {
+ SlaveInfo::Capability* capability =
+ slaveFlags.agent_features->add_capabilities();
+ capability->set_type(type);
+ }
+
+ Try<Owned<cluster::Slave>> agent = StartSlave(detector.get(), slaveFlags);
+ ASSERT_SOME(agent);
+
+ AWAIT_READY(updateSlaveMessage);
+
+ v1::MockResourceProvider resourceProvider;
+
+ // Start and register a resource provider.
+ {
+ Future<Nothing> connected;
+ EXPECT_CALL(resourceProvider, connected())
+ .WillOnce(FutureSatisfy(&connected));
+
+ string scheme = "http";
+
+#ifdef USE_SSL_SOCKET
+ if (process::network::openssl::flags().enabled) {
+ scheme = "https";
+ }
+#endif
+
+ http::URL url(
+ scheme,
+ agent.get()->pid.address.ip,
+ agent.get()->pid.address.port,
+ agent.get()->pid.id + "/api/v1/resource_provider");
+
+ Owned<EndpointDetector> endpointDetector(new ConstantEndpointDetector(url));
+
+ const ContentType contentType = GetParam();
+
+ resourceProvider.start(
+ endpointDetector, contentType, v1::DEFAULT_CREDENTIAL);
+
+ AWAIT_READY(connected);
+ }
+
+ Option<mesos::v1::ResourceProviderID> resourceProviderId;
+
+ {
+ Future<Event::Subscribed> subscribed;
+ EXPECT_CALL(resourceProvider, subscribed(_))
+ .WillOnce(FutureArg<0>(&subscribed));
+
+ mesos::v1::ResourceProviderInfo resourceProviderInfo;
+ resourceProviderInfo.set_type("org.apache.mesos.rp.test");
+ resourceProviderInfo.set_name("test");
+
+ AWAIT_READY(subscribe(&resourceProvider, resourceProviderInfo));
+
+ AWAIT_READY(subscribed);
+
+ resourceProviderId = subscribed->provider_id();
+
+ ASSERT_FALSE(resourceProviderId->value().empty());
+ }
+
+ v1::Resource disk = v1::createDiskResource(
+ "200",
+ "*",
+ None(),
+ None(),
+ v1::createDiskSourceRaw());
+ disk.mutable_provider_id()->CopyFrom(resourceProviderId.get());
+
+ updateSlaveMessage = FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+ AWAIT_READY(updateState(&resourceProvider, resourceProviderId.get(), disk));
+
+ // Wait until the agent's resources have been updated to include the
+ // resource provider resources.
+ AWAIT_READY(updateSlaveMessage);
+
+ // Start and register a framework.
+ auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+ Future<Nothing> connected;
+ EXPECT_CALL(*scheduler, connected(_)).WillOnce(FutureSatisfy(&connected));
+
+ v1::scheduler::TestMesos mesos(
+ master.get()->pid,
+ ContentType::PROTOBUF,
+ scheduler);
+
+ AWAIT_READY(connected);
+
+ Option<v1::FrameworkID> frameworkId;
+ Option<mesos::v1::Offer> offer;
+
+ {
+ Future<v1::scheduler::Event::Subscribed> subscribed;
+ EXPECT_CALL(*scheduler, subscribed(_, _))
+ .WillOnce(FutureArg<1>(&subscribed));
+
+ Future<v1::scheduler::Event::Offers> offers;
+ EXPECT_CALL(*scheduler, offers(_, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ EXPECT_CALL(*scheduler, heartbeat(_))
+ .WillRepeatedly(Return()); // Ignore heartbeats.
+
+ mesos::v1::scheduler::Call call;
+ call.set_type(mesos::v1::scheduler::Call::SUBSCRIBE);
+ mesos::v1::scheduler::Call::Subscribe* subscribe = call.mutable_subscribe();
+ subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO);
+
+ mesos.send(call);
+
+ AWAIT_READY(subscribed);
+
+ frameworkId = subscribed->framework_id();
+
+ // Resource provider resources will be offered to the framework.
+ AWAIT_READY(offers);
+ EXPECT_FALSE(offers->offers().empty());
+ offer = offers->offers(0);
+ }
+
+ Future<Event::Operation> operation;
+
+ // Accept the offer with a 'CREATE_BLOCK' operation.
+ EXPECT_CALL(resourceProvider, operation(_))
+ .WillOnce(FutureArg<0>(&operation));
+
+ mesos.send(v1::createCallAccept(
+ frameworkId.get(),
+ offer.get(),
+ {v1::CREATE_BLOCK(disk)}));
+
+ AWAIT_READY(operation);
+
+ // Have the resource provider handle the operation, converting its resource.
+ {
+ ASSERT_TRUE(operation->info().has_create_block());
+
+ mesos::v1::Resource convertedResource;
+ convertedResource.CopyFrom(operation->info().create_block().source());
+ convertedResource.mutable_disk()->mutable_source()->set_type(
+ mesos::v1::Resource::DiskInfo::Source::BLOCK);
+
+ Try<UUID> operationUUID = UUID::fromBytes(operation->operation_uuid());
+ CHECK_SOME(operationUUID);
+
+ Clock::pause();
+
+ Future<v1::scheduler::Event::Offers> offers;
+ EXPECT_CALL(*scheduler, offers(_, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ AWAIT_READY(updateOfferOperationStatus(
+ &resourceProvider,
+ resourceProviderId.get(),
+ operation->framework_id(),
+ operationUUID.get(),
+ convertedResource));
+
+ Clock::advance(masterFlags.allocation_interval);
+ Clock::settle();
+ Clock::resume();
+
+ // The converted resource should be offered to the framework.
+ AWAIT_READY(offers);
+ EXPECT_FALSE(offers->offers().empty());
+
+ const v1::Offer& offer = offers->offers(0);
+
+ Option<v1::Resource> block;
+ foreach (const v1::Resource& resource, offer.resources()) {
+ if (resource.has_provider_id()) {
+ block = resource;
+ }
+ }
+
+ ASSERT_SOME(block);
+ EXPECT_EQ(
+ v1::Resource::DiskInfo::Source::BLOCK,
+ block->disk().source().type());
+ }
+}
+
} // namespace tests {
} // namespace internal {
} // namespace mesos {
[2/3] mesos git commit: Added test helpers for storage operations.
Posted by ji...@apache.org.
Added test helpers for storage operations.
Review: https://reviews.apache.org/r/63679/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/560942dd
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/560942dd
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/560942dd
Branch: refs/heads/master
Commit: 560942ddab37dc341c4889c158149de3ab987d01
Parents: c946615
Author: Jan Schlicht <ja...@mesosphere.io>
Authored: Mon Nov 13 09:29:23 2017 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Mon Nov 13 09:29:23 2017 -0800
----------------------------------------------------------------------
src/tests/mesos.hpp | 107 +++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 107 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/560942dd/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index e25defe..4e2513f 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -1279,6 +1279,49 @@ inline typename TOffer::Operation LAUNCH_GROUP(
}
+template <typename TResource, typename TTargetType, typename TOffer>
+inline typename TOffer::Operation CREATE_VOLUME(
+ const TResource& source,
+ const TTargetType& type)
+{
+ typename TOffer::Operation operation;
+ operation.set_type(TOffer::Operation::CREATE_VOLUME);
+ operation.mutable_create_volume()->mutable_source()->CopyFrom(source);
+ operation.set_target_type(type);
+ return operation;
+}
+
+
+template <typename TResource, typename TOffer>
+inline typename TOffer::Operation DESTROY_VOLUME(const TResource& volume)
+{
+ typename TOffer::Operation operation;
+ operation.set_type(TOffer::Operation::DESTROY_VOLUME);
+ operation.mutable_destroy_volume()->mutable_volume()->CopyFrom(volume);
+ return operation;
+}
+
+
+template <typename TResource, typename TOffer>
+inline typename TOffer::Operation CREATE_BLOCK(const TResource& source)
+{
+ typename TOffer::Operation operation;
+ operation.set_type(TOffer::Operation::CREATE_BLOCK);
+ operation.mutable_create_block()->mutable_source()->CopyFrom(source);
+ return operation;
+}
+
+
+template <typename TResource, typename TOffer>
+inline typename TOffer::Operation DESTROY_BLOCK(const TResource& block)
+{
+ typename TOffer::Operation operation;
+ operation.set_type(TOffer::Operation::DESTROY_BLOCK);
+ operation.mutable_destroy_block()->mutable_block()->CopyFrom(block);
+ return operation;
+}
+
+
template <typename TParameters, typename TParameter>
inline TParameters parameterize(const ACLs& acls)
{
@@ -1595,6 +1638,36 @@ inline Offer::Operation LAUNCH_GROUP(Args&&... args)
template <typename... Args>
+inline Offer::Operation CREATE_VOLUME(Args&&... args)
+{
+ return common::CREATE_VOLUME<Resource,
+ Resource::DiskInfo::Source::Type,
+ Offer>(std::forward<Args>(args)...);
+}
+
+
+template <typename... Args>
+inline Offer::Operation DESTROY_VOLUME(Args&&... args)
+{
+ return common::DESTROY_VOLUME<Resource, Offer>(std::forward<Args>(args)...);
+}
+
+
+template <typename... Args>
+inline Offer::Operation CREATE_BLOCK(Args&&... args)
+{
+ return common::CREATE_BLOCK<Resource, Offer>(std::forward<Args>(args)...);
+}
+
+
+template <typename... Args>
+inline Offer::Operation DESTROY_BLOCK(Args&&... args)
+{
+ return common::DESTROY_BLOCK<Resource, Offer>(std::forward<Args>(args)...);
+}
+
+
+template <typename... Args>
inline Parameters parameterize(Args&&... args)
{
return common::parameterize<Parameters, Parameter>(
@@ -1865,6 +1938,40 @@ inline mesos::v1::Offer::Operation LAUNCH_GROUP(Args&&... args)
template <typename... Args>
+inline mesos::v1::Offer::Operation CREATE_VOLUME(Args&&... args)
+{
+ return common::CREATE_VOLUME<mesos::v1::Resource,
+ mesos::v1::Resource::DiskInfo::Source::Type,
+ mesos::v1::Offer>(
+ std::forward<Args>(args)...);
+}
+
+
+template <typename... Args>
+inline mesos::v1::Offer::Operation DESTROY_VOLUME(Args&&... args)
+{
+ return common::DESTROY_VOLUME<mesos::v1::Resource, mesos::v1::Offer>(
+ std::forward<Args>(args)...);
+}
+
+
+template <typename... Args>
+inline mesos::v1::Offer::Operation CREATE_BLOCK(Args&&... args)
+{
+ return common::CREATE_BLOCK<mesos::v1::Resource, mesos::v1::Offer>(
+ std::forward<Args>(args)...);
+}
+
+
+template <typename... Args>
+inline mesos::v1::Offer::Operation DESTROY_BLOCK(Args&&... args)
+{
+ return common::DESTROY_BLOCK<mesos::v1::Resource, mesos::v1::Offer>(
+ std::forward<Args>(args)...);
+}
+
+
+template <typename... Args>
inline mesos::v1::Parameters parameterize(Args&&... args)
{
return common::parameterize<mesos::v1::Parameters, mesos::v1::Parameter>(