You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2017/12/04 19:05:33 UTC
[3/3] mesos git commit: Added a publish function in resource provider
manager.
Added a publish function in resource provider manager.
The `ResourceProviderManager::publish()` takes a parameter that contains
all resources required by all executors on the agent and ensure the
resources are ready to use.
Review: https://reviews.apache.org/r/63554/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6f3c74a4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6f3c74a4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6f3c74a4
Branch: refs/heads/master
Commit: 6f3c74a4a88a37de1bbb0182233ffc2f8a88443f
Parents: 9d3268b
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Mon Dec 4 09:37:44 2017 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Mon Dec 4 11:05:22 2017 -0800
----------------------------------------------------------------------
src/resource_provider/manager.cpp | 212 +++++++++++----
src/resource_provider/manager.hpp | 3 +
src/tests/resource_provider_manager_tests.cpp | 291 +++++++++++++++++++++
3 files changed, 461 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/6f3c74a4/src/resource_provider/manager.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp
index a007570..8d8b2f1 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -27,6 +27,7 @@
#include <mesos/v1/resource_provider/resource_provider.hpp>
+#include <process/collect.hpp>
#include <process/dispatch.hpp>
#include <process/id.hpp>
#include <process/process.hpp>
@@ -46,6 +47,7 @@
namespace http = process::http;
+using std::list;
using std::string;
using mesos::internal::resource_provider::validation::call::validate;
@@ -55,10 +57,13 @@ using mesos::resource_provider::Event;
using process::Failure;
using process::Future;
+using process::Owned;
using process::Process;
using process::ProcessBase;
+using process::Promise;
using process::Queue;
+using process::collect;
using process::dispatch;
using process::spawn;
using process::terminate;
@@ -124,9 +129,20 @@ struct ResourceProvider
: info(_info),
http(_http) {}
+ ~ResourceProvider()
+ {
+ http.close();
+
+ foreachvalue (const Owned<Promise<Nothing>>& publish, publishes) {
+ publish->fail(
+ "Failed to publish resources from resource provider " +
+ stringify(info.id()) + ": Connection closed");
+ }
+ }
+
ResourceProviderInfo info;
HttpConnection http;
- Resources resources;
+ hashmap<UUID, Owned<Promise<Nothing>>> publishes;
};
@@ -142,6 +158,8 @@ public:
void applyOfferOperation(const ApplyOfferOperationMessage& message);
+ Future<Nothing> publish(const Resources& resources);
+
Queue<ResourceProviderMessage> messages;
private:
@@ -157,11 +175,15 @@ private:
ResourceProvider* resourceProvider,
const Call::UpdateState& update);
+ void updatePublishStatus(
+ ResourceProvider* resourceProvider,
+ const Call::UpdatePublishStatus& update);
+
ResourceProviderID newResourceProviderId();
struct ResourceProviders
{
- hashmap<ResourceProviderID, ResourceProvider> subscribed;
+ hashmap<ResourceProviderID, Owned<ResourceProvider>> subscribed;
} resourceProviders;
};
@@ -263,8 +285,8 @@ Future<http::Response> ResourceProviderManagerProcess::api(
return BadRequest("Resource provider is not subscribed");
}
- ResourceProvider& resourceProvider =
- resourceProviders.subscribed.at(call.resource_provider_id());
+ ResourceProvider* resourceProvider =
+ resourceProviders.subscribed.at(call.resource_provider_id()).get();
// This isn't a `SUBSCRIBE` call, so the request should include a stream ID.
if (!request.headers.contains("Mesos-Stream-Id")) {
@@ -273,11 +295,11 @@ Future<http::Response> ResourceProviderManagerProcess::api(
}
const string& streamId = request.headers.at("Mesos-Stream-Id");
- if (streamId != resourceProvider.http.streamId.toString()) {
+ if (streamId != resourceProvider->http.streamId.toString()) {
return BadRequest(
"The stream ID '" + streamId + "' included in this request "
"didn't match the stream ID currently associated with "
- " resource provider ID " + resourceProvider.info.id().value());
+ " resource provider ID " + resourceProvider->info.id().value());
}
switch(call.type()) {
@@ -292,20 +314,20 @@ Future<http::Response> ResourceProviderManagerProcess::api(
case Call::UPDATE_OFFER_OPERATION_STATUS: {
updateOfferOperationStatus(
- &resourceProvider,
+ resourceProvider,
call.update_offer_operation_status());
return Accepted();
}
case Call::UPDATE_STATE: {
- updateState(&resourceProvider, call.update_state());
+ updateState(resourceProvider, call.update_state());
return Accepted();
}
case Call::UPDATE_PUBLISH_STATUS: {
- // TODO(nfnt): Add a 'UPDATE_PUBLISH_STATUS' handler.
- return NotImplemented();
+ updatePublishStatus(resourceProvider, call.update_publish_status());
+ return Accepted();
}
}
@@ -347,8 +369,8 @@ void ResourceProviderManagerProcess::applyOfferOperation(
return;
}
- ResourceProvider& resourceProvider =
- resourceProviders.subscribed.at(resourceProviderId.get());
+ ResourceProvider* resourceProvider =
+ resourceProviders.subscribed.at(resourceProviderId.get()).get();
CHECK(message.resource_version_uuid().has_resource_provider_id());
@@ -367,7 +389,7 @@ void ResourceProviderManagerProcess::applyOfferOperation(
event.mutable_operation()->set_resource_version_uuid(
message.resource_version_uuid().uuid());
- if (!resourceProvider.http.send(event)) {
+ if (!resourceProvider->http.send(event)) {
LOG(WARNING) << "Failed to send operation '" << operation.id() << "' "
<< "(uuid: " << uuid.get() << ") from framework "
<< frameworkId << " to resource provider "
@@ -376,6 +398,63 @@ void ResourceProviderManagerProcess::applyOfferOperation(
}
+Future<Nothing> ResourceProviderManagerProcess::publish(
+ const Resources& resources)
+{
+ hashmap<ResourceProviderID, Resources> providedResources;
+
+ foreach (const Resource& resource, resources) {
+ // NOTE: We ignore agent default resources here because those
+ // resources do not need publish, and shouldn't be handled by the
+ // resource provider manager.
+ if (!resource.has_provider_id()) {
+ continue;
+ }
+
+ const ResourceProviderID& resourceProviderId = resource.provider_id();
+
+ if (!resourceProviders.subscribed.contains(resourceProviderId)) {
+ // TODO(chhsiao): If the manager is running on an agent and the
+ // resource comes from an external resource provider, we may want
+ // to load the provider's agent component.
+ return Failure(
+ "Resource provider " + stringify(resourceProviderId) +
+ " is not subscribed");
+ }
+
+ providedResources[resourceProviderId] += resource;
+ }
+
+ list<Future<Nothing>> futures;
+
+ foreachpair (const ResourceProviderID& resourceProviderId,
+ const Resources& resources,
+ providedResources) {
+ UUID uuid = UUID::random();
+
+ Event event;
+ event.set_type(Event::PUBLISH);
+ event.mutable_publish()->set_uuid(uuid.toBytes());
+ event.mutable_publish()->mutable_resources()->CopyFrom(resources);
+
+ ResourceProvider* resourceProvider =
+ resourceProviders.subscribed.at(resourceProviderId).get();
+
+ if (!resourceProvider->http.send(event)) {
+ return Failure(
+ "Failed to send PUBLISH event to resource provider " +
+ stringify(resourceProviderId) + ": connection closed");
+ }
+
+ Owned<Promise<Nothing>> promise(new Promise<Nothing>());
+ futures.push_back(promise->future());
+ resourceProvider->publishes.put(uuid, std::move(promise));
+ }
+
+ return collect(futures).then([] { return Nothing(); });
+}
+
+
void ResourceProviderManagerProcess::subscribe(
const HttpConnection& http,
const Call::Subscribe& subscribe)
@@ -385,43 +464,49 @@ void ResourceProviderManagerProcess::subscribe(
LOG(INFO) << "Subscribing resource provider " << resourceProviderInfo;
- ResourceProvider resourceProvider(resourceProviderInfo, http);
+ // We always create a new `ResourceProvider` struct when a
+ // resource provider subscribes or resubscribes, and replace the
+ // existing `ResourceProvider` if needed.
+ Owned<ResourceProvider> resourceProvider(
+ new ResourceProvider(resourceProviderInfo, http));
if (!resourceProviderInfo.has_id()) {
// The resource provider is subscribing for the first time.
- resourceProvider.info.mutable_id()->CopyFrom(newResourceProviderId());
-
- // TODO(jieyu): Start heartbeat for the resource provider.
-
- resourceProviders.subscribed.put(
- resourceProvider.info.id(),
- resourceProvider);
+ resourceProvider->info.mutable_id()->CopyFrom(newResourceProviderId());
} else {
- if (resourceProviders.subscribed.contains(resourceProviderInfo.id())) {
- // Resource provider is resubscribing after failing over.
- // TODO(nfnt): Test if old and new 'ResourceProviderInfo' match.
- ResourceProvider& _resourceProvider =
- resourceProviders.subscribed.at(resourceProviderInfo.id());
-
- _resourceProvider.http.close();
- _resourceProvider.http = http;
- } else {
- // Resource provider is resubscribing after an agent failover.
- resourceProviders.subscribed.put(
- resourceProviderInfo.id(), resourceProvider);
- }
+ // TODO(chhsiao): The resource provider is resubscribing after being
+ // restarted or an agent failover. The 'ResourceProviderInfo' might
+ // have been updated, but its type and name should remain the same.
+ // We should checkpoint its 'type', 'name' and ID, then check if the
+ // resubscribption is consistent with the checkpointed record.
}
+ const ResourceProviderID& resourceProviderId = resourceProvider->info.id();
+
Event event;
event.set_type(Event::SUBSCRIBED);
- event.mutable_subscribed()->mutable_provider_id()->CopyFrom(
- resourceProvider.info.id());
+ event.mutable_subscribed()->mutable_provider_id()
+ ->CopyFrom(resourceProviderId);
- if (!resourceProviders.subscribed.at(resourceProvider.info.id()).http.send(
- event)) {
+ if (!resourceProvider->http.send(event)) {
LOG(WARNING) << "Failed to send SUBSCRIBED event to resource provider "
- << resourceProvider.info.id() << ": connection closed";
+ << resourceProviderId << ": connection closed";
+ return;
}
+
+ http.closed()
+ .onAny(defer(self(), [=](const Future<Nothing>&) {
+ CHECK(resourceProviders.subscribed.contains(resourceProviderId));
+
+ // NOTE: All pending futures of publish requests for the resource
+ // provider will become failed.
+ resourceProviders.subscribed.erase(resourceProviderId);
+ }));
+
+ // TODO(jieyu): Start heartbeat for the resource provider.
+ resourceProviders.subscribed.put(
+ resourceProviderId,
+ std::move(resourceProvider));
}
@@ -449,15 +534,10 @@ void ResourceProviderManagerProcess::updateState(
ResourceProvider* resourceProvider,
const Call::UpdateState& update)
{
- Resources resources;
-
foreach (const Resource& resource, update.resources()) {
CHECK_EQ(resource.provider_id(), resourceProvider->info.id());
- resources += resource;
}
- resourceProvider->resources = std::move(resources);
-
// TODO(chhsiao): Report pending operations.
Try<UUID> resourceVersionUuid =
@@ -470,7 +550,7 @@ void ResourceProviderManagerProcess::updateState(
ResourceProviderMessage::UpdateState updateState{
resourceProvider->info.id(),
resourceVersionUuid.get(),
- resourceProvider->resources,
+ update.resources(),
{update.operations().begin(), update.operations().end()}};
ResourceProviderMessage message;
@@ -481,6 +561,39 @@ void ResourceProviderManagerProcess::updateState(
}
+void ResourceProviderManagerProcess::updatePublishStatus(
+ ResourceProvider* resourceProvider,
+ const Call::UpdatePublishStatus& update)
+{
+ Try<UUID> uuid = UUID::fromBytes(update.uuid());
+ if (uuid.isError()) {
+ LOG(ERROR) << "Invalid UUID in UpdatePublishStatus from resource provider "
+ << resourceProvider->info.id() << ": " << uuid.error();
+ return;
+ }
+
+ if (!resourceProvider->publishes.contains(uuid.get())) {
+ LOG(ERROR) << "Ignoring UpdatePublishStatus from resource provider "
+ << resourceProvider->info.id() << " because UUID "
+ << uuid->toString() << " is unknown";
+ return;
+ }
+
+ if (update.status() == Call::UpdatePublishStatus::OK) {
+ resourceProvider->publishes.at(uuid.get())->set(Nothing());
+ } else {
+ // TODO(jieyu): Consider to include an error message in
+ // 'UpdatePublishStatus' and surface that to the caller.
+ resourceProvider->publishes.at(uuid.get())->fail(
+ "Failed to publish resources for resource provider " +
+ stringify(resourceProvider->info.id()) + ": Received " +
+ stringify(update.status()) + " status");
+ }
+
+ resourceProvider->publishes.erase(uuid.get());
+}
+
+
ResourceProviderID ResourceProviderManagerProcess::newResourceProviderId()
{
ResourceProviderID resourceProviderId;
@@ -525,6 +638,15 @@ void ResourceProviderManager::applyOfferOperation(
}
+Future<Nothing> ResourceProviderManager::publish(const Resources& resources)
+{
+ return dispatch(
+ process.get(),
+ &ResourceProviderManagerProcess::publish,
+ resources);
+}
+
+
Queue<ResourceProviderMessage> ResourceProviderManager::messages() const
{
return process->messages;
http://git-wip-us.apache.org/repos/asf/mesos/blob/6f3c74a4/src/resource_provider/manager.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.hpp b/src/resource_provider/manager.hpp
index e7a9a6c..c2aeb15 100644
--- a/src/resource_provider/manager.hpp
+++ b/src/resource_provider/manager.hpp
@@ -53,6 +53,9 @@ public:
void applyOfferOperation(const ApplyOfferOperationMessage& message) const;
+ // Ensure that the resources are ready for use.
+ process::Future<Nothing> publish(const Resources& resources);
+
// Returns a stream of messages from the resource provider manager.
process::Queue<ResourceProviderMessage> messages() const;
http://git-wip-us.apache.org/repos/asf/mesos/blob/6f3c74a4/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index feede62..a4c19ca 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -459,6 +459,297 @@ TEST_P(ResourceProviderManagerHttpApiTest, UpdateOfferOperationStatus)
}
+// This test verifies that the pending future returned by
+// `ResourceProviderManager::publish()` becomes ready when the manager
+// receives an publish status update with an `OK` status.
+TEST_P(ResourceProviderManagerHttpApiTest, PublishSuccess)
+{
+ const ContentType contentType = GetParam();
+
+ ResourceProviderManager manager;
+
+ Option<UUID> streamId;
+ Option<mesos::v1::ResourceProviderID> resourceProviderId;
+ Owned<recordio::Reader<Event>> responseDecoder;
+
+ // First, subscribe to the manager to get the ID.
+ {
+ Call call;
+ call.set_type(Call::SUBSCRIBE);
+
+ Call::Subscribe* subscribe = call.mutable_subscribe();
+
+ mesos::v1::ResourceProviderInfo* info =
+ subscribe->mutable_resource_provider_info();
+
+ info->set_type("org.apache.mesos.rp.test");
+ info->set_name("test");
+
+ http::Request request;
+ request.method = "POST";
+ request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+ request.headers["Accept"] = stringify(contentType);
+ request.headers["Content-Type"] = stringify(contentType);
+ request.body = serialize(contentType, call);
+
+ Future<http::Response> response = manager.api(request, None());
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+ ASSERT_EQ(http::Response::PIPE, response->type);
+
+ ASSERT_TRUE(response->headers.contains("Mesos-Stream-Id"));
+ Try<UUID> uuid = UUID::fromString(response->headers.at("Mesos-Stream-Id"));
+
+ CHECK_SOME(uuid);
+ streamId = uuid.get();
+
+ Option<http::Pipe::Reader> reader = response->reader;
+ ASSERT_SOME(reader);
+
+ responseDecoder.reset(new recordio::Reader<Event>(
+ ::recordio::Decoder<Event>(
+ lambda::bind(deserialize<Event>, contentType, lambda::_1)),
+ reader.get()));
+
+ Future<Result<Event>> event = responseDecoder->read();
+ AWAIT_READY(event);
+ ASSERT_SOME(event.get());
+
+ // Check event type is subscribed and the resource provider id is set.
+ ASSERT_EQ(Event::SUBSCRIBED, event->get().type());
+
+ resourceProviderId = event->get().subscribed().provider_id();
+
+ EXPECT_FALSE(resourceProviderId->value().empty());
+ }
+
+ // Then, update the publish status with `OK`.
+ {
+ vector<v1::Resource> resources =
+ v1::Resources::fromString("disk:4").get();
+ foreach (v1::Resource& resource, resources) {
+ resource.mutable_provider_id()->CopyFrom(resourceProviderId.get());
+ }
+
+ Future<Nothing> published = manager.publish(devolve(resources));
+
+ Future<Result<Event>> event = responseDecoder->read();
+ AWAIT_READY(event);
+ ASSERT_SOME(event.get());
+ ASSERT_EQ(Event::PUBLISH, event->get().type());
+
+ Call call;
+ call.set_type(Call::UPDATE_PUBLISH_STATUS);
+ call.mutable_resource_provider_id()->CopyFrom(resourceProviderId.get());
+
+ Call::UpdatePublishStatus* update = call.mutable_update_publish_status();
+ update->set_uuid(event->get().publish().uuid());
+ update->set_status(Call::UpdatePublishStatus::OK);
+
+ http::Request request;
+ request.method = "POST";
+ request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+ request.headers["Accept"] = stringify(contentType);
+ request.headers["Content-Type"] = stringify(contentType);
+ request.headers["Mesos-Stream-Id"] = stringify(streamId.get());
+ request.body = serialize(contentType, call);
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(
+ Accepted().status,
+ manager.api(request, None()));
+
+ // The manager should satisfy the future.
+ AWAIT_READY(published);
+ }
+}
+
+
+// This test verifies that the pending future returned by
+// `ResourceProviderManager::publish()` becomes failed when the manager
+// receives an publish status update with a `FAILED` status.
+TEST_P(ResourceProviderManagerHttpApiTest, PublishFailure)
+{
+ const ContentType contentType = GetParam();
+
+ ResourceProviderManager manager;
+
+ Option<UUID> streamId;
+ Option<mesos::v1::ResourceProviderID> resourceProviderId;
+ Owned<recordio::Reader<Event>> responseDecoder;
+
+ // First, subscribe to the manager to get the ID.
+ {
+ Call call;
+ call.set_type(Call::SUBSCRIBE);
+
+ Call::Subscribe* subscribe = call.mutable_subscribe();
+
+ mesos::v1::ResourceProviderInfo* info =
+ subscribe->mutable_resource_provider_info();
+
+ info->set_type("org.apache.mesos.rp.test");
+ info->set_name("test");
+
+ http::Request request;
+ request.method = "POST";
+ request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+ request.headers["Accept"] = stringify(contentType);
+ request.headers["Content-Type"] = stringify(contentType);
+ request.body = serialize(contentType, call);
+
+ Future<http::Response> response = manager.api(request, None());
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+ ASSERT_EQ(http::Response::PIPE, response->type);
+
+ ASSERT_TRUE(response->headers.contains("Mesos-Stream-Id"));
+ Try<UUID> uuid = UUID::fromString(response->headers.at("Mesos-Stream-Id"));
+
+ CHECK_SOME(uuid);
+ streamId = uuid.get();
+
+ Option<http::Pipe::Reader> reader = response->reader;
+ ASSERT_SOME(reader);
+
+ responseDecoder.reset(new recordio::Reader<Event>(
+ ::recordio::Decoder<Event>(
+ lambda::bind(deserialize<Event>, contentType, lambda::_1)),
+ reader.get()));
+
+ Future<Result<Event>> event = responseDecoder->read();
+ AWAIT_READY(event);
+ ASSERT_SOME(event.get());
+
+ // Check event type is subscribed and the resource provider id is set.
+ ASSERT_EQ(Event::SUBSCRIBED, event->get().type());
+
+ resourceProviderId = event->get().subscribed().provider_id();
+
+ EXPECT_FALSE(resourceProviderId->value().empty());
+ }
+
+ // Then, update the publish status with `FAILED`.
+ {
+ vector<v1::Resource> resources =
+ v1::Resources::fromString("disk:4").get();
+ foreach (v1::Resource& resource, resources) {
+ resource.mutable_provider_id()->CopyFrom(resourceProviderId.get());
+ }
+
+ Future<Nothing> published = manager.publish(devolve(resources));
+
+ Future<Result<Event>> event = responseDecoder->read();
+ AWAIT_READY(event);
+ ASSERT_SOME(event.get());
+ ASSERT_EQ(Event::PUBLISH, event->get().type());
+
+ Call call;
+ call.set_type(Call::UPDATE_PUBLISH_STATUS);
+ call.mutable_resource_provider_id()->CopyFrom(resourceProviderId.get());
+
+ Call::UpdatePublishStatus* update = call.mutable_update_publish_status();
+ update->set_uuid(event->get().publish().uuid());
+ update->set_status(Call::UpdatePublishStatus::FAILED);
+
+ http::Request request;
+ request.method = "POST";
+ request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+ request.headers["Accept"] = stringify(contentType);
+ request.headers["Content-Type"] = stringify(contentType);
+ request.headers["Mesos-Stream-Id"] = stringify(streamId.get());
+ request.body = serialize(contentType, call);
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(
+ Accepted().status,
+ manager.api(request, None()));
+
+ // The manager should fail the future.
+ AWAIT_FAILED(published);
+ }
+}
+
+
+// This test verifies that the pending future returned by
+// `ResourceProviderManager::publish()` becomes failed when the resource
+// provider is disconnected.
+TEST_P(ResourceProviderManagerHttpApiTest, PublishDisconnected)
+{
+ const ContentType contentType = GetParam();
+
+ ResourceProviderManager manager;
+
+ Option<mesos::v1::ResourceProviderID> resourceProviderId;
+ Option<http::Pipe::Reader> reader;
+ Owned<recordio::Reader<Event>> responseDecoder;
+
+ // First, subscribe to the manager to get the ID.
+ {
+ Call call;
+ call.set_type(Call::SUBSCRIBE);
+
+ Call::Subscribe* subscribe = call.mutable_subscribe();
+
+ mesos::v1::ResourceProviderInfo* info =
+ subscribe->mutable_resource_provider_info();
+
+ info->set_type("org.apache.mesos.rp.test");
+ info->set_name("test");
+
+ http::Request request;
+ request.method = "POST";
+ request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+ request.headers["Accept"] = stringify(contentType);
+ request.headers["Content-Type"] = stringify(contentType);
+ request.body = serialize(contentType, call);
+
+ Future<http::Response> response = manager.api(request, None());
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+ ASSERT_EQ(http::Response::PIPE, response->type);
+
+ reader = response->reader;
+ ASSERT_SOME(reader);
+
+ responseDecoder.reset(new recordio::Reader<Event>(
+ ::recordio::Decoder<Event>(
+ lambda::bind(deserialize<Event>, contentType, lambda::_1)),
+ reader.get()));
+
+ Future<Result<Event>> event = responseDecoder->read();
+ AWAIT_READY(event);
+ ASSERT_SOME(event.get());
+
+ // Check event type is subscribed and the resource provider id is set.
+ ASSERT_EQ(Event::SUBSCRIBED, event->get().type());
+
+ resourceProviderId = event->get().subscribed().provider_id();
+
+ EXPECT_FALSE(resourceProviderId->value().empty());
+ }
+
+ // Then, close the connection after receiving a publish event.
+ {
+ vector<v1::Resource> resources =
+ v1::Resources::fromString("disk:4").get();
+ foreach (v1::Resource& resource, resources) {
+ resource.mutable_provider_id()->CopyFrom(resourceProviderId.get());
+ }
+
+ Future<Nothing> published = manager.publish(devolve(resources));
+
+ Future<Result<Event>> event = responseDecoder->read();
+ AWAIT_READY(event);
+ ASSERT_SOME(event.get());
+ ASSERT_EQ(Event::PUBLISH, event->get().type());
+
+ reader->close();
+
+ // The manager should fail the future.
+ AWAIT_FAILED(published);
+ }
+}
+
+
// This test starts an agent and connects directly with its resource
// provider endpoint.
TEST_P(ResourceProviderManagerHttpApiTest, AgentEndpoint)