You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2017/12/04 19:05:33 UTC

[3/3] mesos git commit: Added a publish function in resource provider manager.

Added a publish function in resource provider manager.

The `ResourceProviderManager::publish()` takes a parameter that contains
all resources required by all executors on the agent and ensure the
resources are ready to use.

Review: https://reviews.apache.org/r/63554/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6f3c74a4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6f3c74a4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6f3c74a4

Branch: refs/heads/master
Commit: 6f3c74a4a88a37de1bbb0182233ffc2f8a88443f
Parents: 9d3268b
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Mon Dec 4 09:37:44 2017 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Mon Dec 4 11:05:22 2017 -0800

----------------------------------------------------------------------
 src/resource_provider/manager.cpp             | 212 +++++++++++----
 src/resource_provider/manager.hpp             |   3 +
 src/tests/resource_provider_manager_tests.cpp | 291 +++++++++++++++++++++
 3 files changed, 461 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/6f3c74a4/src/resource_provider/manager.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp
index a007570..8d8b2f1 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -27,6 +27,7 @@
 
 #include <mesos/v1/resource_provider/resource_provider.hpp>
 
+#include <process/collect.hpp>
 #include <process/dispatch.hpp>
 #include <process/id.hpp>
 #include <process/process.hpp>
@@ -46,6 +47,7 @@
 
 namespace http = process::http;
 
+using std::list;
 using std::string;
 
 using mesos::internal::resource_provider::validation::call::validate;
@@ -55,10 +57,13 @@ using mesos::resource_provider::Event;
 
 using process::Failure;
 using process::Future;
+using process::Owned;
 using process::Process;
 using process::ProcessBase;
+using process::Promise;
 using process::Queue;
 
+using process::collect;
 using process::dispatch;
 using process::spawn;
 using process::terminate;
@@ -124,9 +129,20 @@ struct ResourceProvider
     : info(_info),
       http(_http) {}
 
+  ~ResourceProvider()
+  {
+    http.close();
+
+    foreachvalue (const Owned<Promise<Nothing>>& publish, publishes) {
+      publish->fail(
+          "Failed to publish resources from resource provider " +
+          stringify(info.id()) + ": Connection closed");
+    }
+  }
+
   ResourceProviderInfo info;
   HttpConnection http;
-  Resources resources;
+  hashmap<UUID, Owned<Promise<Nothing>>> publishes;
 };
 
 
@@ -142,6 +158,8 @@ public:
 
   void applyOfferOperation(const ApplyOfferOperationMessage& message);
 
+  Future<Nothing> publish(const Resources& resources);
+
   Queue<ResourceProviderMessage> messages;
 
 private:
@@ -157,11 +175,15 @@ private:
       ResourceProvider* resourceProvider,
       const Call::UpdateState& update);
 
+  void updatePublishStatus(
+      ResourceProvider* resourceProvider,
+      const Call::UpdatePublishStatus& update);
+
   ResourceProviderID newResourceProviderId();
 
   struct ResourceProviders
   {
-    hashmap<ResourceProviderID, ResourceProvider> subscribed;
+    hashmap<ResourceProviderID, Owned<ResourceProvider>> subscribed;
   } resourceProviders;
 };
 
@@ -263,8 +285,8 @@ Future<http::Response> ResourceProviderManagerProcess::api(
     return BadRequest("Resource provider is not subscribed");
   }
 
-  ResourceProvider& resourceProvider =
-    resourceProviders.subscribed.at(call.resource_provider_id());
+  ResourceProvider* resourceProvider =
+    resourceProviders.subscribed.at(call.resource_provider_id()).get();
 
   // This isn't a `SUBSCRIBE` call, so the request should include a stream ID.
   if (!request.headers.contains("Mesos-Stream-Id")) {
@@ -273,11 +295,11 @@ Future<http::Response> ResourceProviderManagerProcess::api(
   }
 
   const string& streamId = request.headers.at("Mesos-Stream-Id");
-  if (streamId != resourceProvider.http.streamId.toString()) {
+  if (streamId != resourceProvider->http.streamId.toString()) {
     return BadRequest(
         "The stream ID '" + streamId + "' included in this request "
         "didn't match the stream ID currently associated with "
-        " resource provider ID " + resourceProvider.info.id().value());
+        " resource provider ID " + resourceProvider->info.id().value());
   }
 
   switch(call.type()) {
@@ -292,20 +314,20 @@ Future<http::Response> ResourceProviderManagerProcess::api(
 
     case Call::UPDATE_OFFER_OPERATION_STATUS: {
       updateOfferOperationStatus(
-          &resourceProvider,
+          resourceProvider,
           call.update_offer_operation_status());
 
       return Accepted();
     }
 
     case Call::UPDATE_STATE: {
-      updateState(&resourceProvider, call.update_state());
+      updateState(resourceProvider, call.update_state());
       return Accepted();
     }
 
     case Call::UPDATE_PUBLISH_STATUS: {
-      // TODO(nfnt): Add a 'UPDATE_PUBLISH_STATUS' handler.
-      return NotImplemented();
+      updatePublishStatus(resourceProvider, call.update_publish_status());
+      return Accepted();
     }
   }
 
@@ -347,8 +369,8 @@ void ResourceProviderManagerProcess::applyOfferOperation(
     return;
   }
 
-  ResourceProvider& resourceProvider =
-    resourceProviders.subscribed.at(resourceProviderId.get());
+  ResourceProvider* resourceProvider =
+    resourceProviders.subscribed.at(resourceProviderId.get()).get();
 
   CHECK(message.resource_version_uuid().has_resource_provider_id());
 
@@ -367,7 +389,7 @@ void ResourceProviderManagerProcess::applyOfferOperation(
   event.mutable_operation()->set_resource_version_uuid(
       message.resource_version_uuid().uuid());
 
-  if (!resourceProvider.http.send(event)) {
+  if (!resourceProvider->http.send(event)) {
     LOG(WARNING) << "Failed to send operation '" << operation.id() << "' "
                  << "(uuid: " << uuid.get() << ") from framework "
                  << frameworkId << " to resource provider "
@@ -376,6 +398,63 @@ void ResourceProviderManagerProcess::applyOfferOperation(
 }
 
 
+Future<Nothing> ResourceProviderManagerProcess::publish(
+    const Resources& resources)
+{
+  hashmap<ResourceProviderID, Resources> providedResources;
+
+  foreach (const Resource& resource, resources) {
+    // NOTE: We ignore agent default resources here because those
+    // resources do not need publish, and shouldn't be handled by the
+    // resource provider manager.
+    if (!resource.has_provider_id()) {
+      continue;
+    }
+
+    const ResourceProviderID& resourceProviderId = resource.provider_id();
+
+    if (!resourceProviders.subscribed.contains(resourceProviderId)) {
+      // TODO(chhsiao): If the manager is running on an agent and the
+      // resource comes from an external resource provider, we may want
+      // to load the provider's agent component.
+      return Failure(
+          "Resource provider " + stringify(resourceProviderId) +
+          " is not subscribed");
+    }
+
+    providedResources[resourceProviderId] += resource;
+  }
+
+  list<Future<Nothing>> futures;
+
+  foreachpair (const ResourceProviderID& resourceProviderId,
+               const Resources& resources,
+               providedResources) {
+    UUID uuid = UUID::random();
+
+    Event event;
+    event.set_type(Event::PUBLISH);
+    event.mutable_publish()->set_uuid(uuid.toBytes());
+    event.mutable_publish()->mutable_resources()->CopyFrom(resources);
+
+    ResourceProvider* resourceProvider =
+      resourceProviders.subscribed.at(resourceProviderId).get();
+
+    if (!resourceProvider->http.send(event)) {
+      return Failure(
+          "Failed to send PUBLISH event to resource provider " +
+          stringify(resourceProviderId) + ": connection closed");
+    }
+
+    Owned<Promise<Nothing>> promise(new Promise<Nothing>());
+    futures.push_back(promise->future());
+    resourceProvider->publishes.put(uuid, std::move(promise));
+  }
+
+  return collect(futures).then([] { return Nothing(); });
+}
+
+
 void ResourceProviderManagerProcess::subscribe(
     const HttpConnection& http,
     const Call::Subscribe& subscribe)
@@ -385,43 +464,49 @@ void ResourceProviderManagerProcess::subscribe(
 
   LOG(INFO) << "Subscribing resource provider " << resourceProviderInfo;
 
-  ResourceProvider resourceProvider(resourceProviderInfo, http);
+  // We always create a new `ResourceProvider` struct when a
+  // resource provider subscribes or resubscribes, and replace the
+  // existing `ResourceProvider` if needed.
+  Owned<ResourceProvider> resourceProvider(
+      new ResourceProvider(resourceProviderInfo, http));
 
   if (!resourceProviderInfo.has_id()) {
     // The resource provider is subscribing for the first time.
-    resourceProvider.info.mutable_id()->CopyFrom(newResourceProviderId());
-
-    // TODO(jieyu): Start heartbeat for the resource provider.
-
-    resourceProviders.subscribed.put(
-        resourceProvider.info.id(),
-        resourceProvider);
+    resourceProvider->info.mutable_id()->CopyFrom(newResourceProviderId());
   } else {
-    if (resourceProviders.subscribed.contains(resourceProviderInfo.id())) {
-      // Resource provider is resubscribing after failing over.
-      // TODO(nfnt): Test if old and new 'ResourceProviderInfo' match.
-      ResourceProvider& _resourceProvider =
-        resourceProviders.subscribed.at(resourceProviderInfo.id());
-
-      _resourceProvider.http.close();
-      _resourceProvider.http = http;
-    } else {
-      // Resource provider is resubscribing after an agent failover.
-      resourceProviders.subscribed.put(
-          resourceProviderInfo.id(), resourceProvider);
-    }
+    // TODO(chhsiao): The resource provider is resubscribing after being
+    // restarted or an agent failover. The 'ResourceProviderInfo' might
+    // have been updated, but its type and name should remain the same.
+    // We should checkpoint its 'type', 'name' and ID, then check if the
+    // resubscribption is consistent with the checkpointed record.
   }
 
+  const ResourceProviderID& resourceProviderId = resourceProvider->info.id();
+
   Event event;
   event.set_type(Event::SUBSCRIBED);
-  event.mutable_subscribed()->mutable_provider_id()->CopyFrom(
-      resourceProvider.info.id());
+  event.mutable_subscribed()->mutable_provider_id()
+    ->CopyFrom(resourceProviderId);
 
-  if (!resourceProviders.subscribed.at(resourceProvider.info.id()).http.send(
-          event)) {
+  if (!resourceProvider->http.send(event)) {
     LOG(WARNING) << "Failed to send SUBSCRIBED event to resource provider "
-                 << resourceProvider.info.id() << ": connection closed";
+                 << resourceProviderId << ": connection closed";
+    return;
   }
+
+  http.closed()
+    .onAny(defer(self(), [=](const Future<Nothing>&) {
+      CHECK(resourceProviders.subscribed.contains(resourceProviderId));
+
+      // NOTE: All pending futures of publish requests for the resource
+      // provider will become failed.
+      resourceProviders.subscribed.erase(resourceProviderId);
+    }));
+
+  // TODO(jieyu): Start heartbeat for the resource provider.
+  resourceProviders.subscribed.put(
+      resourceProviderId,
+      std::move(resourceProvider));
 }
 
 
@@ -449,15 +534,10 @@ void ResourceProviderManagerProcess::updateState(
     ResourceProvider* resourceProvider,
     const Call::UpdateState& update)
 {
-  Resources resources;
-
   foreach (const Resource& resource, update.resources()) {
     CHECK_EQ(resource.provider_id(), resourceProvider->info.id());
-    resources += resource;
   }
 
-  resourceProvider->resources = std::move(resources);
-
   // TODO(chhsiao): Report pending operations.
 
   Try<UUID> resourceVersionUuid =
@@ -470,7 +550,7 @@ void ResourceProviderManagerProcess::updateState(
   ResourceProviderMessage::UpdateState updateState{
       resourceProvider->info.id(),
       resourceVersionUuid.get(),
-      resourceProvider->resources,
+      update.resources(),
       {update.operations().begin(), update.operations().end()}};
 
   ResourceProviderMessage message;
@@ -481,6 +561,39 @@ void ResourceProviderManagerProcess::updateState(
 }
 
 
+void ResourceProviderManagerProcess::updatePublishStatus(
+    ResourceProvider* resourceProvider,
+    const Call::UpdatePublishStatus& update)
+{
+  Try<UUID> uuid = UUID::fromBytes(update.uuid());
+  if (uuid.isError()) {
+    LOG(ERROR) << "Invalid UUID in UpdatePublishStatus from resource provider "
+               << resourceProvider->info.id() << ": " << uuid.error();
+    return;
+  }
+
+  if (!resourceProvider->publishes.contains(uuid.get())) {
+    LOG(ERROR) << "Ignoring UpdatePublishStatus from resource provider "
+               << resourceProvider->info.id() << " because UUID "
+               << uuid->toString() << " is unknown";
+    return;
+  }
+
+  if (update.status() == Call::UpdatePublishStatus::OK) {
+    resourceProvider->publishes.at(uuid.get())->set(Nothing());
+  } else {
+    // TODO(jieyu): Consider to include an error message in
+    // 'UpdatePublishStatus' and surface that to the caller.
+    resourceProvider->publishes.at(uuid.get())->fail(
+        "Failed to publish resources for resource provider " +
+        stringify(resourceProvider->info.id()) + ": Received " +
+        stringify(update.status()) + " status");
+  }
+
+  resourceProvider->publishes.erase(uuid.get());
+}
+
+
 ResourceProviderID ResourceProviderManagerProcess::newResourceProviderId()
 {
   ResourceProviderID resourceProviderId;
@@ -525,6 +638,15 @@ void ResourceProviderManager::applyOfferOperation(
 }
 
 
+Future<Nothing> ResourceProviderManager::publish(const Resources& resources)
+{
+  return dispatch(
+      process.get(),
+      &ResourceProviderManagerProcess::publish,
+      resources);
+}
+
+
 Queue<ResourceProviderMessage> ResourceProviderManager::messages() const
 {
   return process->messages;

http://git-wip-us.apache.org/repos/asf/mesos/blob/6f3c74a4/src/resource_provider/manager.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.hpp b/src/resource_provider/manager.hpp
index e7a9a6c..c2aeb15 100644
--- a/src/resource_provider/manager.hpp
+++ b/src/resource_provider/manager.hpp
@@ -53,6 +53,9 @@ public:
 
   void applyOfferOperation(const ApplyOfferOperationMessage& message) const;
 
+  // Ensure that the resources are ready for use.
+  process::Future<Nothing> publish(const Resources& resources);
+
   // Returns a stream of messages from the resource provider manager.
   process::Queue<ResourceProviderMessage> messages() const;
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/6f3c74a4/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index feede62..a4c19ca 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -459,6 +459,297 @@ TEST_P(ResourceProviderManagerHttpApiTest, UpdateOfferOperationStatus)
 }
 
 
+// This test verifies that the pending future returned by
+// `ResourceProviderManager::publish()` becomes ready when the manager
+// receives an publish status update with an `OK` status.
+TEST_P(ResourceProviderManagerHttpApiTest, PublishSuccess)
+{
+  const ContentType contentType = GetParam();
+
+  ResourceProviderManager manager;
+
+  Option<UUID> streamId;
+  Option<mesos::v1::ResourceProviderID> resourceProviderId;
+  Owned<recordio::Reader<Event>> responseDecoder;
+
+  // First, subscribe to the manager to get the ID.
+  {
+    Call call;
+    call.set_type(Call::SUBSCRIBE);
+
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+
+    mesos::v1::ResourceProviderInfo* info =
+      subscribe->mutable_resource_provider_info();
+
+    info->set_type("org.apache.mesos.rp.test");
+    info->set_name("test");
+
+    http::Request request;
+    request.method = "POST";
+    request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+    request.headers["Accept"] = stringify(contentType);
+    request.headers["Content-Type"] = stringify(contentType);
+    request.body = serialize(contentType, call);
+
+    Future<http::Response> response = manager.api(request, None());
+
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+    ASSERT_EQ(http::Response::PIPE, response->type);
+
+    ASSERT_TRUE(response->headers.contains("Mesos-Stream-Id"));
+    Try<UUID> uuid = UUID::fromString(response->headers.at("Mesos-Stream-Id"));
+
+    CHECK_SOME(uuid);
+    streamId = uuid.get();
+
+    Option<http::Pipe::Reader> reader = response->reader;
+    ASSERT_SOME(reader);
+
+    responseDecoder.reset(new recordio::Reader<Event>(
+        ::recordio::Decoder<Event>(
+            lambda::bind(deserialize<Event>, contentType, lambda::_1)),
+        reader.get()));
+
+    Future<Result<Event>> event = responseDecoder->read();
+    AWAIT_READY(event);
+    ASSERT_SOME(event.get());
+
+    // Check event type is subscribed and the resource provider id is set.
+    ASSERT_EQ(Event::SUBSCRIBED, event->get().type());
+
+    resourceProviderId = event->get().subscribed().provider_id();
+
+    EXPECT_FALSE(resourceProviderId->value().empty());
+  }
+
+  // Then, update the publish status with `OK`.
+  {
+    vector<v1::Resource> resources =
+      v1::Resources::fromString("disk:4").get();
+    foreach (v1::Resource& resource, resources) {
+      resource.mutable_provider_id()->CopyFrom(resourceProviderId.get());
+    }
+
+    Future<Nothing> published = manager.publish(devolve(resources));
+
+    Future<Result<Event>> event = responseDecoder->read();
+    AWAIT_READY(event);
+    ASSERT_SOME(event.get());
+    ASSERT_EQ(Event::PUBLISH, event->get().type());
+
+    Call call;
+    call.set_type(Call::UPDATE_PUBLISH_STATUS);
+    call.mutable_resource_provider_id()->CopyFrom(resourceProviderId.get());
+
+    Call::UpdatePublishStatus* update = call.mutable_update_publish_status();
+    update->set_uuid(event->get().publish().uuid());
+    update->set_status(Call::UpdatePublishStatus::OK);
+
+    http::Request request;
+    request.method = "POST";
+    request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+    request.headers["Accept"] = stringify(contentType);
+    request.headers["Content-Type"] = stringify(contentType);
+    request.headers["Mesos-Stream-Id"] = stringify(streamId.get());
+    request.body = serialize(contentType, call);
+
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(
+        Accepted().status,
+        manager.api(request, None()));
+
+    // The manager should satisfy the future.
+    AWAIT_READY(published);
+  }
+}
+
+
+// This test verifies that the pending future returned by
+// `ResourceProviderManager::publish()` becomes failed when the manager
+// receives an publish status update with a `FAILED` status.
+TEST_P(ResourceProviderManagerHttpApiTest, PublishFailure)
+{
+  const ContentType contentType = GetParam();
+
+  ResourceProviderManager manager;
+
+  Option<UUID> streamId;
+  Option<mesos::v1::ResourceProviderID> resourceProviderId;
+  Owned<recordio::Reader<Event>> responseDecoder;
+
+  // First, subscribe to the manager to get the ID.
+  {
+    Call call;
+    call.set_type(Call::SUBSCRIBE);
+
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+
+    mesos::v1::ResourceProviderInfo* info =
+      subscribe->mutable_resource_provider_info();
+
+    info->set_type("org.apache.mesos.rp.test");
+    info->set_name("test");
+
+    http::Request request;
+    request.method = "POST";
+    request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+    request.headers["Accept"] = stringify(contentType);
+    request.headers["Content-Type"] = stringify(contentType);
+    request.body = serialize(contentType, call);
+
+    Future<http::Response> response = manager.api(request, None());
+
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+    ASSERT_EQ(http::Response::PIPE, response->type);
+
+    ASSERT_TRUE(response->headers.contains("Mesos-Stream-Id"));
+    Try<UUID> uuid = UUID::fromString(response->headers.at("Mesos-Stream-Id"));
+
+    CHECK_SOME(uuid);
+    streamId = uuid.get();
+
+    Option<http::Pipe::Reader> reader = response->reader;
+    ASSERT_SOME(reader);
+
+    responseDecoder.reset(new recordio::Reader<Event>(
+        ::recordio::Decoder<Event>(
+            lambda::bind(deserialize<Event>, contentType, lambda::_1)),
+        reader.get()));
+
+    Future<Result<Event>> event = responseDecoder->read();
+    AWAIT_READY(event);
+    ASSERT_SOME(event.get());
+
+    // Check event type is subscribed and the resource provider id is set.
+    ASSERT_EQ(Event::SUBSCRIBED, event->get().type());
+
+    resourceProviderId = event->get().subscribed().provider_id();
+
+    EXPECT_FALSE(resourceProviderId->value().empty());
+  }
+
+  // Then, update the publish status with `FAILED`.
+  {
+    vector<v1::Resource> resources =
+      v1::Resources::fromString("disk:4").get();
+    foreach (v1::Resource& resource, resources) {
+      resource.mutable_provider_id()->CopyFrom(resourceProviderId.get());
+    }
+
+    Future<Nothing> published = manager.publish(devolve(resources));
+
+    Future<Result<Event>> event = responseDecoder->read();
+    AWAIT_READY(event);
+    ASSERT_SOME(event.get());
+    ASSERT_EQ(Event::PUBLISH, event->get().type());
+
+    Call call;
+    call.set_type(Call::UPDATE_PUBLISH_STATUS);
+    call.mutable_resource_provider_id()->CopyFrom(resourceProviderId.get());
+
+    Call::UpdatePublishStatus* update = call.mutable_update_publish_status();
+    update->set_uuid(event->get().publish().uuid());
+    update->set_status(Call::UpdatePublishStatus::FAILED);
+
+    http::Request request;
+    request.method = "POST";
+    request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+    request.headers["Accept"] = stringify(contentType);
+    request.headers["Content-Type"] = stringify(contentType);
+    request.headers["Mesos-Stream-Id"] = stringify(streamId.get());
+    request.body = serialize(contentType, call);
+
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(
+        Accepted().status,
+        manager.api(request, None()));
+
+    // The manager should fail the future.
+    AWAIT_FAILED(published);
+  }
+}
+
+
+// This test verifies that the pending future returned by
+// `ResourceProviderManager::publish()` becomes failed when the resource
+// provider is disconnected.
+TEST_P(ResourceProviderManagerHttpApiTest, PublishDisconnected)
+{
+  const ContentType contentType = GetParam();
+
+  ResourceProviderManager manager;
+
+  Option<mesos::v1::ResourceProviderID> resourceProviderId;
+  Option<http::Pipe::Reader> reader;
+  Owned<recordio::Reader<Event>> responseDecoder;
+
+  // First, subscribe to the manager to get the ID.
+  {
+    Call call;
+    call.set_type(Call::SUBSCRIBE);
+
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+
+    mesos::v1::ResourceProviderInfo* info =
+      subscribe->mutable_resource_provider_info();
+
+    info->set_type("org.apache.mesos.rp.test");
+    info->set_name("test");
+
+    http::Request request;
+    request.method = "POST";
+    request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+    request.headers["Accept"] = stringify(contentType);
+    request.headers["Content-Type"] = stringify(contentType);
+    request.body = serialize(contentType, call);
+
+    Future<http::Response> response = manager.api(request, None());
+
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+    ASSERT_EQ(http::Response::PIPE, response->type);
+
+    reader = response->reader;
+    ASSERT_SOME(reader);
+
+    responseDecoder.reset(new recordio::Reader<Event>(
+        ::recordio::Decoder<Event>(
+            lambda::bind(deserialize<Event>, contentType, lambda::_1)),
+        reader.get()));
+
+    Future<Result<Event>> event = responseDecoder->read();
+    AWAIT_READY(event);
+    ASSERT_SOME(event.get());
+
+    // Check event type is subscribed and the resource provider id is set.
+    ASSERT_EQ(Event::SUBSCRIBED, event->get().type());
+
+    resourceProviderId = event->get().subscribed().provider_id();
+
+    EXPECT_FALSE(resourceProviderId->value().empty());
+  }
+
+  // Then, close the connection after receiving a publish event.
+  {
+    vector<v1::Resource> resources =
+      v1::Resources::fromString("disk:4").get();
+    foreach (v1::Resource& resource, resources) {
+      resource.mutable_provider_id()->CopyFrom(resourceProviderId.get());
+    }
+
+    Future<Nothing> published = manager.publish(devolve(resources));
+
+    Future<Result<Event>> event = responseDecoder->read();
+    AWAIT_READY(event);
+    ASSERT_SOME(event.get());
+    ASSERT_EQ(Event::PUBLISH, event->get().type());
+
+    reader->close();
+
+    // The manager should fail the future.
+    AWAIT_FAILED(published);
+  }
+}
+
+
 // This test starts an agent and connects directly with its resource
 // provider endpoint.
 TEST_P(ResourceProviderManagerHttpApiTest, AgentEndpoint)