You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2017/07/27 00:15:32 UTC

[3/3] mesos git commit: Implemented the 'SUBSCRIBE' call in the resource provider manager.

Implemented the 'SUBSCRIBE' call in the resource provider manager.

Using HTTP, resource providers are expected to subscribe to the resource
provider manager. The resource provider manager will assign a unique ID
to subscribed resource providers and use events to communicate with
them.

Review: https://reviews.apache.org/r/60771/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6827a483
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6827a483
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6827a483

Branch: refs/heads/master
Commit: 6827a48398a4db9306de2e23ed14f5abbb3780b2
Parents: 7528f59
Author: Jan Schlicht <ja...@mesosphere.io>
Authored: Wed Jul 26 17:04:51 2017 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed Jul 26 17:04:51 2017 -0700

----------------------------------------------------------------------
 src/resource_provider/manager.cpp              | 237 +++++++++++++++++++-
 src/tests/resource_provider_http_api_tests.cpp | 218 ++++++++++++++++--
 2 files changed, 437 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/6827a483/src/resource_provider/manager.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp
index a9bf258..44e1576 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -18,12 +18,38 @@
 
 #include <glog/logging.h>
 
+#include <string>
+#include <utility>
+
+#include <mesos/http.hpp>
+
+#include <mesos/resource_provider/resource_provider.hpp>
+
+#include <mesos/v1/resource_provider/resource_provider.hpp>
+
 #include <process/dispatch.hpp>
 #include <process/id.hpp>
 #include <process/process.hpp>
 
+#include <stout/hashmap.hpp>
+#include <stout/protobuf.hpp>
+#include <stout/uuid.hpp>
+
+#include "common/http.hpp"
+#include "common/recordio.hpp"
+
+#include "internal/devolve.hpp"
+#include "internal/evolve.hpp"
+
+#include "resource_provider/validation.hpp"
+
 namespace http = process::http;
 
+using mesos::internal::resource_provider::validation::call::validate;
+
+using mesos::resource_provider::Call;
+using mesos::resource_provider::Event;
+
 using process::Failure;
 using process::Future;
 using process::Owned;
@@ -36,31 +62,236 @@ using process::spawn;
 using process::terminate;
 using process::wait;
 
+using process::http::Accepted;
+using process::http::BadRequest;
+using process::http::OK;
+using process::http::MethodNotAllowed;
+using process::http::NotAcceptable;
+using process::http::NotImplemented;
+using process::http::Pipe;
+using process::http::UnsupportedMediaType;
+
 using process::http::authentication::Principal;
 
+using std::string;
+
+
 namespace mesos {
 namespace internal {
 
+// Represents the streaming HTTP connection to a resource provider.
+struct HttpConnection
+{
+  HttpConnection(const http::Pipe::Writer& _writer,
+                 ContentType _contentType)
+    : writer(_writer),
+      contentType(_contentType),
+      encoder(lambda::bind(serialize, contentType, lambda::_1)) {}
+
+  // Converts the message to an Event before sending.
+  template <typename Message>
+  bool send(const Message& message)
+  {
+    // We need to evolve the internal 'message' into a
+    // 'v1::resource_provider::Event'.
+    return writer.write(encoder.encode(evolve(message)));
+  }
+
+  bool close()
+  {
+    return writer.close();
+  }
+
+  Future<Nothing> closed() const
+  {
+    return writer.readerClosed();
+  }
+
+  http::Pipe::Writer writer;
+  ContentType contentType;
+  ::recordio::Encoder<v1::resource_provider::Event> encoder;
+};
+
+
+struct ResourceProvider
+{
+  ResourceProvider(
+      const ResourceProviderInfo& _info,
+      const HttpConnection& _http)
+    : info(_info), http(_http) {}
+
+  ResourceProviderInfo info;
+  HttpConnection http;
+};
+
+
 class ResourceProviderManagerProcess
   : public Process<ResourceProviderManagerProcess>
 {
 public:
-  ResourceProviderManagerProcess()
-    : ProcessBase(process::ID::generate("resource-provider-manager")) {}
+  ResourceProviderManagerProcess();
 
   Future<http::Response> api(
       const http::Request& request,
       const Option<Principal>& principal);
 
   Queue<ResourceProviderMessage> messages;
+
+  hashmap<ResourceProviderID, ResourceProvider> resourceProviders;
+
+private:
+  void subscribe(
+      const HttpConnection& http,
+      const Call::Subscribe& subscribe);
+
+  void update(
+      ResourceProvider* resourceProvider,
+      const Call::Update& update);
+
+  ResourceProviderID newResourceProviderId();
 };
 
 
+ResourceProviderManagerProcess::ResourceProviderManagerProcess()
+  : ProcessBase(process::ID::generate("resource-provider-manager"))
+{
+}
+
+
 Future<http::Response> ResourceProviderManagerProcess::api(
     const http::Request& request,
     const Option<Principal>& principal)
 {
-  return Failure("Unimplemented");
+  if (request.method != "POST") {
+    return MethodNotAllowed({"POST"}, request.method);
+  }
+
+  v1::resource_provider::Call v1Call;
+
+  // TODO(anand): Content type values are case-insensitive.
+  Option<string> contentType = request.headers.get("Content-Type");
+
+  if (contentType.isNone()) {
+    return BadRequest("Expecting 'Content-Type' to be present");
+  }
+
+  if (contentType.get() == APPLICATION_PROTOBUF) {
+    if (!v1Call.ParseFromString(request.body)) {
+      return BadRequest("Failed to parse body into Call protobuf");
+    }
+  } else if (contentType.get() == APPLICATION_JSON) {
+    Try<JSON::Value> value = JSON::parse(request.body);
+    if (value.isError()) {
+      return BadRequest("Failed to parse body into JSON: " + value.error());
+    }
+
+    Try<v1::resource_provider::Call> parse =
+      ::protobuf::parse<v1::resource_provider::Call>(value.get());
+
+    if (parse.isError()) {
+      return BadRequest("Failed to convert JSON into Call protobuf: " +
+                        parse.error());
+    }
+
+    v1Call = parse.get();
+  } else {
+    return UnsupportedMediaType(
+        string("Expecting 'Content-Type' of ") +
+        APPLICATION_JSON + " or " + APPLICATION_PROTOBUF);
+  }
+
+  Call call = devolve(v1Call);
+
+  Option<Error> error = validate(call);
+  if (error.isSome()) {
+    return BadRequest(
+        "Failed to validate resource_provider::Call: " + error->message);
+  }
+
+  ContentType acceptType;
+  if (request.acceptsMediaType(APPLICATION_JSON)) {
+    acceptType = ContentType::JSON;
+  } else if (request.acceptsMediaType(APPLICATION_PROTOBUF)) {
+    acceptType = ContentType::PROTOBUF;
+  } else {
+    return NotAcceptable(
+        string("Expecting 'Accept' to allow ") +
+        "'" + APPLICATION_PROTOBUF + "' or '" + APPLICATION_JSON + "'");
+  }
+
+  switch(call.type()) {
+    case Call::UNKNOWN: {
+      return NotImplemented();
+    }
+
+    case Call::SUBSCRIBE: {
+      Pipe pipe;
+      OK ok;
+
+      ok.headers["Content-Type"] = stringify(acceptType);
+      ok.type = http::Response::PIPE;
+      ok.reader = pipe.reader();
+
+      HttpConnection http(pipe.writer(), acceptType);
+      subscribe(http, call.subscribe());
+
+      return ok;
+    }
+
+    case Call::UPDATE: {
+      if (!resourceProviders.contains(call.resource_provider_id())) {
+        return BadRequest("Resource provider cannot be found");
+      }
+
+      auto resourceProvider = resourceProviders.at(call.resource_provider_id());
+
+      update(&resourceProvider, call.update());
+      return Accepted();
+    }
+  }
+
+  UNREACHABLE();
+}
+
+
+void ResourceProviderManagerProcess::subscribe(
+    const HttpConnection& http,
+    const Call::Subscribe& subscribe)
+{
+  ResourceProviderInfo resourceProviderInfo =
+    subscribe.resource_provider_info();
+  resourceProviderInfo.mutable_id()->CopyFrom(newResourceProviderId());
+
+  ResourceProvider resourceProvider(resourceProviderInfo, http);
+
+  Event event;
+  event.set_type(Event::SUBSCRIBED);
+  event.mutable_subscribed()->mutable_provider_id()->CopyFrom(
+      resourceProvider.info.id());
+
+  if (!resourceProvider.http.send(event)) {
+    LOG(WARNING) << "Unable to send event to resource provider "
+                 << stringify(resourceProvider.info.id())
+                 << ": connection closed";
+  }
+
+  resourceProviders.put(resourceProviderInfo.id(), std::move(resourceProvider));
+}
+
+
+void ResourceProviderManagerProcess::update(
+    ResourceProvider* resourceProvider,
+    const Call::Update& update)
+{
+  // TODO(nfnt): Implement the 'UPDATE' call handler.
+}
+
+
+ResourceProviderID ResourceProviderManagerProcess::newResourceProviderId()
+{
+  ResourceProviderID resourceProviderId;
+  resourceProviderId.set_value(UUID::random().toString());
+  return resourceProviderId;
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/6827a483/src/tests/resource_provider_http_api_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_http_api_tests.cpp b/src/tests/resource_provider_http_api_tests.cpp
index 46a3f37..85906ea 100644
--- a/src/tests/resource_provider_http_api_tests.cpp
+++ b/src/tests/resource_provider_http_api_tests.cpp
@@ -14,37 +14,50 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+#include <string>
+
+#include <mesos/v1/resource_provider/resource_provider.hpp>
+
 #include <process/clock.hpp>
 #include <process/gmock.hpp>
 #include <process/http.hpp>
 
+#include <stout/lambda.hpp>
 #include <stout/protobuf.hpp>
+#include <stout/recordio.hpp>
 #include <stout/stringify.hpp>
 
-#include <mesos/v1/resource_provider/resource_provider.hpp>
+#include "common/http.hpp"
+#include "common/recordio.hpp"
 
 #include "slave/slave.hpp"
 
+#include "resource_provider/manager.hpp"
+
 #include "tests/mesos.hpp"
 
 namespace http = process::http;
 
+using mesos::internal::slave::Slave;
+
+using mesos::master::detector::MasterDetector;
+
+using mesos::v1::resource_provider::Call;
+using mesos::v1::resource_provider::Event;
+
 using process::Clock;
 using process::Future;
 using process::Owned;
 
-using process::http::InternalServerError;
+using process::http::BadRequest;
+using process::http::OK;
+using process::http::UnsupportedMediaType;
+
+using std::string;
 
 using testing::Values;
 using testing::WithParamInterface;
 
-using mesos::internal::slave::Slave;
-
-using mesos::master::detector::MasterDetector;
-
-using mesos::v1::resource_provider::Call;
-using mesos::v1::resource_provider::Event;
-
 namespace mesos {
 namespace internal {
 namespace tests {
@@ -61,6 +74,164 @@ INSTANTIATE_TEST_CASE_P(
     Values(ContentType::PROTOBUF, ContentType::JSON));
 
 
+TEST_F(ResourceProviderHttpApiTest, NoContentType)
+{
+  http::Request request;
+  request.method = "POST";
+  request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+
+  ResourceProviderManager manager;
+
+  Future<http::Response> response = manager.api(request, None());
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, response);
+  AWAIT_EXPECT_RESPONSE_BODY_EQ(
+      "Expecting 'Content-Type' to be present",
+      response);
+}
+
+
+// This test sends a valid JSON blob that cannot be deserialized
+// into a valid protobuf resulting in a BadRequest.
+TEST_F(ResourceProviderHttpApiTest, ValidJsonButInvalidProtobuf)
+{
+  JSON::Object object;
+  object.values["string"] = "valid_json";
+
+  http::Request request;
+  request.method = "POST";
+  request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+  request.headers["Accept"] = APPLICATION_JSON;
+  request.headers["Content-Type"] = APPLICATION_JSON;
+  request.body = stringify(object);
+
+  ResourceProviderManager manager;
+
+  Future<http::Response> response = manager.api(request, None());
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, response);
+  AWAIT_EXPECT_RESPONSE_BODY_EQ(
+      "Failed to validate resource_provider::Call: "
+      "Expecting 'type' to be present",
+      response);
+}
+
+
+TEST_P(ResourceProviderHttpApiTest, MalformedContent)
+{
+  const ContentType contentType = GetParam();
+
+  http::Request request;
+  request.method = "POST";
+  request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+  request.headers["Accept"] = stringify(contentType);
+  request.headers["Content-Type"] = stringify(contentType);
+  request.body = "MALFORMED_CONTENT";
+
+  ResourceProviderManager manager;
+
+  Future<http::Response> response = manager.api(request, None());
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, response);
+  switch (contentType) {
+    case ContentType::PROTOBUF:
+      AWAIT_EXPECT_RESPONSE_BODY_EQ(
+          "Failed to parse body into Call protobuf",
+          response);
+      break;
+    case ContentType::JSON:
+      AWAIT_EXPECT_RESPONSE_BODY_EQ(
+          "Failed to parse body into JSON: "
+          "syntax error at line 1 near: MALFORMED_CONTENT",
+          response);
+      break;
+    case ContentType::RECORDIO:
+      break;
+  }
+}
+
+
+TEST_P(ResourceProviderHttpApiTest, UnsupportedContentMediaType)
+{
+  Call call;
+  call.set_type(Call::SUBSCRIBE);
+
+  Call::Subscribe* subscribe = call.mutable_subscribe();
+
+  mesos::v1::ResourceProviderInfo* info =
+    subscribe->mutable_resource_provider_info();
+
+  info->set_type("org.apache.mesos.rp.test");
+  info->set_name("test");
+
+  const ContentType contentType = GetParam();
+  const string unknownMediaType = "application/unknown-media-type";
+
+  http::Request request;
+  request.method = "POST";
+  request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+  request.headers["Accept"] = stringify(contentType);
+  request.headers["Content-Type"] = unknownMediaType;
+  request.body = serialize(contentType, call);
+
+  ResourceProviderManager manager;
+
+  Future<http::Response> response = manager.api(request, None());
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(UnsupportedMediaType().status, response)
+    << response->body;
+}
+
+
+TEST_P(ResourceProviderHttpApiTest, Subscribe)
+{
+  Call call;
+  call.set_type(Call::SUBSCRIBE);
+
+  Call::Subscribe* subscribe = call.mutable_subscribe();
+
+  mesos::v1::ResourceProviderInfo* info =
+    subscribe->mutable_resource_provider_info();
+
+  info->set_type("org.apache.mesos.rp.test");
+  info->set_name("test");
+
+  const ContentType contentType = GetParam();
+
+  http::Request request;
+  request.method = "POST";
+  request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+  request.headers["Accept"] = stringify(contentType);
+  request.headers["Content-Type"] = stringify(contentType);
+  request.body = serialize(contentType, call);
+
+  ResourceProviderManager manager;
+
+  Future<http::Response> response = manager.api(request, None());
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response) << response->body;
+  ASSERT_EQ(http::Response::PIPE, response->type);
+
+  Option<http::Pipe::Reader> reader = response->reader;
+  ASSERT_SOME(reader);
+
+  recordio::Reader<Event> responseDecoder(
+      ::recordio::Decoder<Event>(
+          lambda::bind(deserialize<Event>, contentType, lambda::_1)),
+      reader.get());
+
+  Future<Result<Event>> event = responseDecoder.read();
+  AWAIT_READY(event);
+  ASSERT_SOME(event.get());
+
+  // Check event type is subscribed and the resource provider id is set.
+  ASSERT_EQ(Event::SUBSCRIBED, event->get().type());
+  ASSERT_NE("", event->get().subscribed().provider_id().value());
+}
+
+
+// This test starts an agent and connects directly with its resource
+// provider endpoint.
 TEST_P(ResourceProviderHttpApiTest, AgentEndpoint)
 {
   Try<Owned<cluster::Master>> master = StartMaster();
@@ -70,8 +241,8 @@ TEST_P(ResourceProviderHttpApiTest, AgentEndpoint)
 
   Owned<MasterDetector> detector = master.get()->createDetector();
 
-  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get());
-  ASSERT_SOME(slave);
+  Try<Owned<cluster::Slave>> agent = StartSlave(detector.get());
+  ASSERT_SOME(agent);
 
   AWAIT_READY(__recover);
 
@@ -95,15 +266,32 @@ TEST_P(ResourceProviderHttpApiTest, AgentEndpoint)
   http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
   headers["Accept"] = stringify(contentType);
 
-  Future<http::Response> response = http::post(
-      slave.get()->pid,
+  Future<http::Response> response = http::streaming::post(
+      agent.get()->pid,
       "api/v1/resource_provider",
       headers,
       serialize(contentType, call),
       stringify(contentType));
 
-  // TODO(jieyu): Adjust the expectation once we add implementation.
-  AWAIT_EXPECT_RESPONSE_STATUS_EQ(InternalServerError().status, response);
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+  AWAIT_EXPECT_RESPONSE_HEADER_EQ("chunked", "Transfer-Encoding", response);
+  ASSERT_EQ(http::Response::PIPE, response->type);
+
+  Option<http::Pipe::Reader> reader = response->reader;
+  ASSERT_SOME(reader);
+
+  recordio::Reader<Event> responseDecoder(
+      ::recordio::Decoder<Event>(
+          lambda::bind(deserialize<Event>, contentType, lambda::_1)),
+      reader.get());
+
+  Future<Result<Event>> event = responseDecoder.read();
+  AWAIT_READY(event);
+  ASSERT_SOME(event.get());
+
+  // Check event type is subscribed and the resource provider id is set.
+  ASSERT_EQ(Event::SUBSCRIBED, event->get().type());
+  ASSERT_NE("", event->get().subscribed().provider_id().value());
 }
 
 } // namespace tests {