You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2017/07/27 00:15:30 UTC

[1/3] mesos git commit: Added evolve functions for resource provider Event/Call.

Repository: mesos
Updated Branches:
  refs/heads/master d0c8d43e1 -> 6827a4839


Added evolve functions for resource provider Event/Call.

Review: https://reviews.apache.org/r/60769/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/610ef401
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/610ef401
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/610ef401

Branch: refs/heads/master
Commit: 610ef401fead9a028108e3b1ef11544166a6c5f2
Parents: d0c8d43
Author: Jan Schlicht <ja...@mesosphere.io>
Authored: Wed Jul 26 17:04:44 2017 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed Jul 26 17:04:44 2017 -0700

----------------------------------------------------------------------
 src/internal/evolve.cpp | 13 +++++++++++++
 src/internal/evolve.hpp | 10 ++++++++++
 2 files changed, 23 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/610ef401/src/internal/evolve.cpp
----------------------------------------------------------------------
diff --git a/src/internal/evolve.cpp b/src/internal/evolve.cpp
index 3ac55ac..cb1c0eb 100644
--- a/src/internal/evolve.cpp
+++ b/src/internal/evolve.cpp
@@ -231,6 +231,19 @@ v1::master::Response evolve(const mesos::master::Response& response)
 }
 
 
+v1::resource_provider::Call evolve(const mesos::resource_provider::Call& call)
+{
+  return evolve<v1::resource_provider::Call>(call);
+}
+
+
+v1::resource_provider::Event evolve(
+    const mesos::resource_provider::Event& event)
+{
+  return evolve<v1::resource_provider::Event>(event);
+}
+
+
 v1::scheduler::Call evolve(const scheduler::Call& call)
 {
   return evolve<v1::scheduler::Call>(call);

http://git-wip-us.apache.org/repos/asf/mesos/blob/610ef401/src/internal/evolve.hpp
----------------------------------------------------------------------
diff --git a/src/internal/evolve.hpp b/src/internal/evolve.hpp
index 42ead34..d796f32 100644
--- a/src/internal/evolve.hpp
+++ b/src/internal/evolve.hpp
@@ -32,6 +32,8 @@
 
 #include <mesos/maintenance/maintenance.hpp>
 
+#include <mesos/resource_provider/resource_provider.hpp>
+
 #include <mesos/scheduler/scheduler.hpp>
 
 #include <mesos/v1/mesos.hpp>
@@ -43,6 +45,8 @@
 
 #include <mesos/v1/master/master.hpp>
 
+#include <mesos/v1/resource_provider/resource_provider.hpp>
+
 #include <mesos/v1/maintenance/maintenance.hpp>
 
 #include <mesos/v1/scheduler/scheduler.hpp>
@@ -87,6 +91,12 @@ v1::maintenance::Schedule evolve(const maintenance::Schedule& schedule);
 
 v1::master::Response evolve(const mesos::master::Response& response);
 
+
+v1::resource_provider::Call evolve(const mesos::resource_provider::Call& call);
+v1::resource_provider::Event evolve(
+    const mesos::resource_provider::Event& event);
+
+
 v1::scheduler::Call evolve(const scheduler::Call& call);
 
 


[3/3] mesos git commit: Implemented the 'SUBSCRIBE' call in the resource provider manager.

Posted by ji...@apache.org.
Implemented the 'SUBSCRIBE' call in the resource provider manager.

Using HTTP, resource providers are expected to subscribe to the resource
provider manager. The resource provider manager will assign a unique ID
to subscribed resource providers and use events to communicate with
them.

Review: https://reviews.apache.org/r/60771/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6827a483
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6827a483
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6827a483

Branch: refs/heads/master
Commit: 6827a48398a4db9306de2e23ed14f5abbb3780b2
Parents: 7528f59
Author: Jan Schlicht <ja...@mesosphere.io>
Authored: Wed Jul 26 17:04:51 2017 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed Jul 26 17:04:51 2017 -0700

----------------------------------------------------------------------
 src/resource_provider/manager.cpp              | 237 +++++++++++++++++++-
 src/tests/resource_provider_http_api_tests.cpp | 218 ++++++++++++++++--
 2 files changed, 437 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/6827a483/src/resource_provider/manager.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp
index a9bf258..44e1576 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -18,12 +18,38 @@
 
 #include <glog/logging.h>
 
+#include <string>
+#include <utility>
+
+#include <mesos/http.hpp>
+
+#include <mesos/resource_provider/resource_provider.hpp>
+
+#include <mesos/v1/resource_provider/resource_provider.hpp>
+
 #include <process/dispatch.hpp>
 #include <process/id.hpp>
 #include <process/process.hpp>
 
+#include <stout/hashmap.hpp>
+#include <stout/protobuf.hpp>
+#include <stout/uuid.hpp>
+
+#include "common/http.hpp"
+#include "common/recordio.hpp"
+
+#include "internal/devolve.hpp"
+#include "internal/evolve.hpp"
+
+#include "resource_provider/validation.hpp"
+
 namespace http = process::http;
 
+using mesos::internal::resource_provider::validation::call::validate;
+
+using mesos::resource_provider::Call;
+using mesos::resource_provider::Event;
+
 using process::Failure;
 using process::Future;
 using process::Owned;
@@ -36,31 +62,236 @@ using process::spawn;
 using process::terminate;
 using process::wait;
 
+using process::http::Accepted;
+using process::http::BadRequest;
+using process::http::OK;
+using process::http::MethodNotAllowed;
+using process::http::NotAcceptable;
+using process::http::NotImplemented;
+using process::http::Pipe;
+using process::http::UnsupportedMediaType;
+
 using process::http::authentication::Principal;
 
+using std::string;
+
+
 namespace mesos {
 namespace internal {
 
+// Represents the streaming HTTP connection to a resource provider.
+struct HttpConnection
+{
+  HttpConnection(const http::Pipe::Writer& _writer,
+                 ContentType _contentType)
+    : writer(_writer),
+      contentType(_contentType),
+      encoder(lambda::bind(serialize, contentType, lambda::_1)) {}
+
+  // Converts the message to an Event before sending.
+  template <typename Message>
+  bool send(const Message& message)
+  {
+    // We need to evolve the internal 'message' into a
+    // 'v1::resource_provider::Event'.
+    return writer.write(encoder.encode(evolve(message)));
+  }
+
+  bool close()
+  {
+    return writer.close();
+  }
+
+  Future<Nothing> closed() const
+  {
+    return writer.readerClosed();
+  }
+
+  http::Pipe::Writer writer;
+  ContentType contentType;
+  ::recordio::Encoder<v1::resource_provider::Event> encoder;
+};
+
+
+struct ResourceProvider
+{
+  ResourceProvider(
+      const ResourceProviderInfo& _info,
+      const HttpConnection& _http)
+    : info(_info), http(_http) {}
+
+  ResourceProviderInfo info;
+  HttpConnection http;
+};
+
+
 class ResourceProviderManagerProcess
   : public Process<ResourceProviderManagerProcess>
 {
 public:
-  ResourceProviderManagerProcess()
-    : ProcessBase(process::ID::generate("resource-provider-manager")) {}
+  ResourceProviderManagerProcess();
 
   Future<http::Response> api(
       const http::Request& request,
       const Option<Principal>& principal);
 
   Queue<ResourceProviderMessage> messages;
+
+  hashmap<ResourceProviderID, ResourceProvider> resourceProviders;
+
+private:
+  void subscribe(
+      const HttpConnection& http,
+      const Call::Subscribe& subscribe);
+
+  void update(
+      ResourceProvider* resourceProvider,
+      const Call::Update& update);
+
+  ResourceProviderID newResourceProviderId();
 };
 
 
+ResourceProviderManagerProcess::ResourceProviderManagerProcess()
+  : ProcessBase(process::ID::generate("resource-provider-manager"))
+{
+}
+
+
 Future<http::Response> ResourceProviderManagerProcess::api(
     const http::Request& request,
     const Option<Principal>& principal)
 {
-  return Failure("Unimplemented");
+  if (request.method != "POST") {
+    return MethodNotAllowed({"POST"}, request.method);
+  }
+
+  v1::resource_provider::Call v1Call;
+
+  // TODO(anand): Content type values are case-insensitive.
+  Option<string> contentType = request.headers.get("Content-Type");
+
+  if (contentType.isNone()) {
+    return BadRequest("Expecting 'Content-Type' to be present");
+  }
+
+  if (contentType.get() == APPLICATION_PROTOBUF) {
+    if (!v1Call.ParseFromString(request.body)) {
+      return BadRequest("Failed to parse body into Call protobuf");
+    }
+  } else if (contentType.get() == APPLICATION_JSON) {
+    Try<JSON::Value> value = JSON::parse(request.body);
+    if (value.isError()) {
+      return BadRequest("Failed to parse body into JSON: " + value.error());
+    }
+
+    Try<v1::resource_provider::Call> parse =
+      ::protobuf::parse<v1::resource_provider::Call>(value.get());
+
+    if (parse.isError()) {
+      return BadRequest("Failed to convert JSON into Call protobuf: " +
+                        parse.error());
+    }
+
+    v1Call = parse.get();
+  } else {
+    return UnsupportedMediaType(
+        string("Expecting 'Content-Type' of ") +
+        APPLICATION_JSON + " or " + APPLICATION_PROTOBUF);
+  }
+
+  Call call = devolve(v1Call);
+
+  Option<Error> error = validate(call);
+  if (error.isSome()) {
+    return BadRequest(
+        "Failed to validate resource_provider::Call: " + error->message);
+  }
+
+  ContentType acceptType;
+  if (request.acceptsMediaType(APPLICATION_JSON)) {
+    acceptType = ContentType::JSON;
+  } else if (request.acceptsMediaType(APPLICATION_PROTOBUF)) {
+    acceptType = ContentType::PROTOBUF;
+  } else {
+    return NotAcceptable(
+        string("Expecting 'Accept' to allow ") +
+        "'" + APPLICATION_PROTOBUF + "' or '" + APPLICATION_JSON + "'");
+  }
+
+  switch(call.type()) {
+    case Call::UNKNOWN: {
+      return NotImplemented();
+    }
+
+    case Call::SUBSCRIBE: {
+      Pipe pipe;
+      OK ok;
+
+      ok.headers["Content-Type"] = stringify(acceptType);
+      ok.type = http::Response::PIPE;
+      ok.reader = pipe.reader();
+
+      HttpConnection http(pipe.writer(), acceptType);
+      subscribe(http, call.subscribe());
+
+      return ok;
+    }
+
+    case Call::UPDATE: {
+      if (!resourceProviders.contains(call.resource_provider_id())) {
+        return BadRequest("Resource provider cannot be found");
+      }
+
+      auto resourceProvider = resourceProviders.at(call.resource_provider_id());
+
+      update(&resourceProvider, call.update());
+      return Accepted();
+    }
+  }
+
+  UNREACHABLE();
+}
+
+
+void ResourceProviderManagerProcess::subscribe(
+    const HttpConnection& http,
+    const Call::Subscribe& subscribe)
+{
+  ResourceProviderInfo resourceProviderInfo =
+    subscribe.resource_provider_info();
+  resourceProviderInfo.mutable_id()->CopyFrom(newResourceProviderId());
+
+  ResourceProvider resourceProvider(resourceProviderInfo, http);
+
+  Event event;
+  event.set_type(Event::SUBSCRIBED);
+  event.mutable_subscribed()->mutable_provider_id()->CopyFrom(
+      resourceProvider.info.id());
+
+  if (!resourceProvider.http.send(event)) {
+    LOG(WARNING) << "Unable to send event to resource provider "
+                 << stringify(resourceProvider.info.id())
+                 << ": connection closed";
+  }
+
+  resourceProviders.put(resourceProviderInfo.id(), std::move(resourceProvider));
+}
+
+
+void ResourceProviderManagerProcess::update(
+    ResourceProvider* resourceProvider,
+    const Call::Update& update)
+{
+  // TODO(nfnt): Implement the 'UPDATE' call handler.
+}
+
+
+ResourceProviderID ResourceProviderManagerProcess::newResourceProviderId()
+{
+  ResourceProviderID resourceProviderId;
+  resourceProviderId.set_value(UUID::random().toString());
+  return resourceProviderId;
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/6827a483/src/tests/resource_provider_http_api_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_http_api_tests.cpp b/src/tests/resource_provider_http_api_tests.cpp
index 46a3f37..85906ea 100644
--- a/src/tests/resource_provider_http_api_tests.cpp
+++ b/src/tests/resource_provider_http_api_tests.cpp
@@ -14,37 +14,50 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+#include <string>
+
+#include <mesos/v1/resource_provider/resource_provider.hpp>
+
 #include <process/clock.hpp>
 #include <process/gmock.hpp>
 #include <process/http.hpp>
 
+#include <stout/lambda.hpp>
 #include <stout/protobuf.hpp>
+#include <stout/recordio.hpp>
 #include <stout/stringify.hpp>
 
-#include <mesos/v1/resource_provider/resource_provider.hpp>
+#include "common/http.hpp"
+#include "common/recordio.hpp"
 
 #include "slave/slave.hpp"
 
+#include "resource_provider/manager.hpp"
+
 #include "tests/mesos.hpp"
 
 namespace http = process::http;
 
+using mesos::internal::slave::Slave;
+
+using mesos::master::detector::MasterDetector;
+
+using mesos::v1::resource_provider::Call;
+using mesos::v1::resource_provider::Event;
+
 using process::Clock;
 using process::Future;
 using process::Owned;
 
-using process::http::InternalServerError;
+using process::http::BadRequest;
+using process::http::OK;
+using process::http::UnsupportedMediaType;
+
+using std::string;
 
 using testing::Values;
 using testing::WithParamInterface;
 
-using mesos::internal::slave::Slave;
-
-using mesos::master::detector::MasterDetector;
-
-using mesos::v1::resource_provider::Call;
-using mesos::v1::resource_provider::Event;
-
 namespace mesos {
 namespace internal {
 namespace tests {
@@ -61,6 +74,164 @@ INSTANTIATE_TEST_CASE_P(
     Values(ContentType::PROTOBUF, ContentType::JSON));
 
 
+TEST_F(ResourceProviderHttpApiTest, NoContentType)
+{
+  http::Request request;
+  request.method = "POST";
+  request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+
+  ResourceProviderManager manager;
+
+  Future<http::Response> response = manager.api(request, None());
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, response);
+  AWAIT_EXPECT_RESPONSE_BODY_EQ(
+      "Expecting 'Content-Type' to be present",
+      response);
+}
+
+
+// This test sends a valid JSON blob that cannot be deserialized
+// into a valid protobuf resulting in a BadRequest.
+TEST_F(ResourceProviderHttpApiTest, ValidJsonButInvalidProtobuf)
+{
+  JSON::Object object;
+  object.values["string"] = "valid_json";
+
+  http::Request request;
+  request.method = "POST";
+  request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+  request.headers["Accept"] = APPLICATION_JSON;
+  request.headers["Content-Type"] = APPLICATION_JSON;
+  request.body = stringify(object);
+
+  ResourceProviderManager manager;
+
+  Future<http::Response> response = manager.api(request, None());
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, response);
+  AWAIT_EXPECT_RESPONSE_BODY_EQ(
+      "Failed to validate resource_provider::Call: "
+      "Expecting 'type' to be present",
+      response);
+}
+
+
+TEST_P(ResourceProviderHttpApiTest, MalformedContent)
+{
+  const ContentType contentType = GetParam();
+
+  http::Request request;
+  request.method = "POST";
+  request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+  request.headers["Accept"] = stringify(contentType);
+  request.headers["Content-Type"] = stringify(contentType);
+  request.body = "MALFORMED_CONTENT";
+
+  ResourceProviderManager manager;
+
+  Future<http::Response> response = manager.api(request, None());
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, response);
+  switch (contentType) {
+    case ContentType::PROTOBUF:
+      AWAIT_EXPECT_RESPONSE_BODY_EQ(
+          "Failed to parse body into Call protobuf",
+          response);
+      break;
+    case ContentType::JSON:
+      AWAIT_EXPECT_RESPONSE_BODY_EQ(
+          "Failed to parse body into JSON: "
+          "syntax error at line 1 near: MALFORMED_CONTENT",
+          response);
+      break;
+    case ContentType::RECORDIO:
+      break;
+  }
+}
+
+
+TEST_P(ResourceProviderHttpApiTest, UnsupportedContentMediaType)
+{
+  Call call;
+  call.set_type(Call::SUBSCRIBE);
+
+  Call::Subscribe* subscribe = call.mutable_subscribe();
+
+  mesos::v1::ResourceProviderInfo* info =
+    subscribe->mutable_resource_provider_info();
+
+  info->set_type("org.apache.mesos.rp.test");
+  info->set_name("test");
+
+  const ContentType contentType = GetParam();
+  const string unknownMediaType = "application/unknown-media-type";
+
+  http::Request request;
+  request.method = "POST";
+  request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+  request.headers["Accept"] = stringify(contentType);
+  request.headers["Content-Type"] = unknownMediaType;
+  request.body = serialize(contentType, call);
+
+  ResourceProviderManager manager;
+
+  Future<http::Response> response = manager.api(request, None());
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(UnsupportedMediaType().status, response)
+    << response->body;
+}
+
+
+TEST_P(ResourceProviderHttpApiTest, Subscribe)
+{
+  Call call;
+  call.set_type(Call::SUBSCRIBE);
+
+  Call::Subscribe* subscribe = call.mutable_subscribe();
+
+  mesos::v1::ResourceProviderInfo* info =
+    subscribe->mutable_resource_provider_info();
+
+  info->set_type("org.apache.mesos.rp.test");
+  info->set_name("test");
+
+  const ContentType contentType = GetParam();
+
+  http::Request request;
+  request.method = "POST";
+  request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+  request.headers["Accept"] = stringify(contentType);
+  request.headers["Content-Type"] = stringify(contentType);
+  request.body = serialize(contentType, call);
+
+  ResourceProviderManager manager;
+
+  Future<http::Response> response = manager.api(request, None());
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response) << response->body;
+  ASSERT_EQ(http::Response::PIPE, response->type);
+
+  Option<http::Pipe::Reader> reader = response->reader;
+  ASSERT_SOME(reader);
+
+  recordio::Reader<Event> responseDecoder(
+      ::recordio::Decoder<Event>(
+          lambda::bind(deserialize<Event>, contentType, lambda::_1)),
+      reader.get());
+
+  Future<Result<Event>> event = responseDecoder.read();
+  AWAIT_READY(event);
+  ASSERT_SOME(event.get());
+
+  // Check event type is subscribed and the resource provider id is set.
+  ASSERT_EQ(Event::SUBSCRIBED, event->get().type());
+  ASSERT_NE("", event->get().subscribed().provider_id().value());
+}
+
+
+// This test starts an agent and connects directly with its resource
+// provider endpoint.
 TEST_P(ResourceProviderHttpApiTest, AgentEndpoint)
 {
   Try<Owned<cluster::Master>> master = StartMaster();
@@ -70,8 +241,8 @@ TEST_P(ResourceProviderHttpApiTest, AgentEndpoint)
 
   Owned<MasterDetector> detector = master.get()->createDetector();
 
-  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get());
-  ASSERT_SOME(slave);
+  Try<Owned<cluster::Slave>> agent = StartSlave(detector.get());
+  ASSERT_SOME(agent);
 
   AWAIT_READY(__recover);
 
@@ -95,15 +266,32 @@ TEST_P(ResourceProviderHttpApiTest, AgentEndpoint)
   http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
   headers["Accept"] = stringify(contentType);
 
-  Future<http::Response> response = http::post(
-      slave.get()->pid,
+  Future<http::Response> response = http::streaming::post(
+      agent.get()->pid,
       "api/v1/resource_provider",
       headers,
       serialize(contentType, call),
       stringify(contentType));
 
-  // TODO(jieyu): Adjust the expectation once we add implementation.
-  AWAIT_EXPECT_RESPONSE_STATUS_EQ(InternalServerError().status, response);
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+  AWAIT_EXPECT_RESPONSE_HEADER_EQ("chunked", "Transfer-Encoding", response);
+  ASSERT_EQ(http::Response::PIPE, response->type);
+
+  Option<http::Pipe::Reader> reader = response->reader;
+  ASSERT_SOME(reader);
+
+  recordio::Reader<Event> responseDecoder(
+      ::recordio::Decoder<Event>(
+          lambda::bind(deserialize<Event>, contentType, lambda::_1)),
+      reader.get());
+
+  Future<Result<Event>> event = responseDecoder.read();
+  AWAIT_READY(event);
+  ASSERT_SOME(event.get());
+
+  // Check event type is subscribed and the resource provider id is set.
+  ASSERT_EQ(Event::SUBSCRIBED, event->get().type());
+  ASSERT_NE("", event->get().subscribed().provider_id().value());
 }
 
 } // namespace tests {


[2/3] mesos git commit: Added validation functions for resource provider calls.

Posted by ji...@apache.org.
Added validation functions for resource provider calls.

Review: https://reviews.apache.org/r/60770/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7528f595
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7528f595
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7528f595

Branch: refs/heads/master
Commit: 7528f595534c9c182ee82468a3e0e8520449283f
Parents: 610ef40
Author: Jan Schlicht <ja...@mesosphere.io>
Authored: Wed Jul 26 17:04:48 2017 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed Jul 26 17:04:48 2017 -0700

----------------------------------------------------------------------
 src/CMakeLists.txt                              |  1 +
 src/Makefile.am                                 |  3 +
 src/internal/devolve.cpp                        |  9 ++-
 src/internal/devolve.hpp                        |  5 +-
 src/resource_provider/validation.cpp            | 73 +++++++++++++++++
 src/resource_provider/validation.hpp            | 39 +++++++++
 src/tests/CMakeLists.txt                        |  1 +
 .../resource_provider_validation_tests.cpp      | 83 ++++++++++++++++++++
 8 files changed, 208 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/7528f595/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 51b6742..98ccaf4 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -403,6 +403,7 @@ set(RESOURCE_PROVIDER_SRC
   resource_provider/driver.cpp
   resource_provider/local.cpp
   resource_provider/manager.cpp
+  resource_provider/validation.cpp
   resource_provider/storage/provider.cpp
   )
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/7528f595/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 8b91716..5712bad 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -956,6 +956,7 @@ libmesos_no_3rdparty_la_SOURCES +=					\
   resource_provider/driver.cpp						\
   resource_provider/local.cpp						\
   resource_provider/manager.cpp						\
+  resource_provider/validation.cpp					\
   resource_provider/storage/provider.cpp				\
   sched/sched.cpp							\
   scheduler/scheduler.cpp						\
@@ -1094,6 +1095,7 @@ libmesos_no_3rdparty_la_SOURCES +=					\
   resource_provider/local.hpp						\
   resource_provider/manager.hpp						\
   resource_provider/message.hpp						\
+  resource_provider/validation.hpp					\
   resource_provider/storage/provider.hpp				\
   sched/constants.hpp							\
   sched/flags.hpp							\
@@ -2357,6 +2359,7 @@ mesos_tests_SOURCES =						\
   tests/reservation_tests.cpp					\
   tests/resource_offers_tests.cpp				\
   tests/resource_provider_http_api_tests.cpp			\
+  tests/resource_provider_validation_tests.cpp			\
   tests/resources_tests.cpp					\
   tests/resources_utils.cpp					\
   tests/role_tests.cpp						\

http://git-wip-us.apache.org/repos/asf/mesos/blob/7528f595/src/internal/devolve.cpp
----------------------------------------------------------------------
diff --git a/src/internal/devolve.cpp b/src/internal/devolve.cpp
index 79c668f..7eb58e9 100644
--- a/src/internal/devolve.cpp
+++ b/src/internal/devolve.cpp
@@ -166,15 +166,16 @@ executor::Event devolve(const v1::executor::Event& event)
 }
 
 
-resource_provider::Call devolve(const v1::resource_provider::Call& call)
+mesos::resource_provider::Call devolve(const v1::resource_provider::Call& call)
 {
-  return devolve<resource_provider::Call>(call);
+  return devolve<mesos::resource_provider::Call>(call);
 }
 
 
-resource_provider::Event devolve(const v1::resource_provider::Event& event)
+mesos::resource_provider::Event devolve(
+    const v1::resource_provider::Event& event)
 {
-  return devolve<resource_provider::Event>(event);
+  return devolve<mesos::resource_provider::Event>(event);
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/7528f595/src/internal/devolve.hpp
----------------------------------------------------------------------
diff --git a/src/internal/devolve.hpp b/src/internal/devolve.hpp
index 214fcef..656173d 100644
--- a/src/internal/devolve.hpp
+++ b/src/internal/devolve.hpp
@@ -67,8 +67,9 @@ SlaveInfo devolve(const v1::AgentInfo& agentInfo);
 TaskID devolve(const v1::TaskID& taskId);
 TaskStatus devolve(const v1::TaskStatus& status);
 
-resource_provider::Call devolve(const v1::resource_provider::Call& call);
-resource_provider::Event devolve(const v1::resource_provider::Event& event);
+mesos::resource_provider::Call devolve(const v1::resource_provider::Call& call);
+mesos::resource_provider::Event devolve(
+    const v1::resource_provider::Event& event);
 
 scheduler::Call devolve(const v1::scheduler::Call& call);
 scheduler::Event devolve(const v1::scheduler::Event& event);

http://git-wip-us.apache.org/repos/asf/mesos/blob/7528f595/src/resource_provider/validation.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/validation.cpp b/src/resource_provider/validation.cpp
new file mode 100644
index 0000000..d292722
--- /dev/null
+++ b/src/resource_provider/validation.cpp
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "resource_provider/validation.hpp"
+
+#include <stout/none.hpp>
+#include <stout/unreachable.hpp>
+
+using mesos::resource_provider::Call;
+
+namespace mesos {
+namespace internal {
+namespace resource_provider {
+namespace validation {
+namespace call {
+
+Option<Error> validate(const Call& call)
+{
+  if (!call.IsInitialized()) {
+    return Error("Not initialized: " + call.InitializationErrorString());
+  }
+
+  if (!call.has_type()) {
+    return Error("Expecting 'type' to be present");
+  }
+
+  switch(call.type()) {
+    case Call::UNKNOWN: {
+      return None();
+    }
+
+    case Call::SUBSCRIBE: {
+      if (!call.has_subscribe()) {
+        return Error("Expecting 'subscribe' to be present");
+      }
+
+      return None();
+    }
+
+    case Call::UPDATE: {
+      if (!call.has_resource_provider_id()) {
+        return Error("Expecting 'resource_provider_id' to be present");
+      }
+
+      if (!call.has_update()) {
+        return Error("Expecting 'update' to be present");
+      }
+
+      return None();
+    }
+  }
+
+  UNREACHABLE();
+}
+
+} // namespace call {
+} // namespace validation {
+} // namespace resource_provider {
+} // namespace internal {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/7528f595/src/resource_provider/validation.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/validation.hpp b/src/resource_provider/validation.hpp
new file mode 100644
index 0000000..0068f56
--- /dev/null
+++ b/src/resource_provider/validation.hpp
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef __RESOURCE_PROVIDER_VALIDATION_HPP__
+#define __RESOURCE_PROVIDER_VALIDATION_HPP__
+
+#include <mesos/resource_provider/resource_provider.hpp>
+
+#include <stout/error.hpp>
+#include <stout/option.hpp>
+
+namespace mesos {
+namespace internal {
+namespace resource_provider {
+namespace validation {
+namespace call {
+
+Option<Error> validate(const mesos::resource_provider::Call& call);
+
+} // namespace call {
+} // namespace validation {
+} // namespace resource_provider {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __RESOURCE_PROVIDER_VALIDATION_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/7528f595/src/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt
index f5fe5a6..6dd2716 100644
--- a/src/tests/CMakeLists.txt
+++ b/src/tests/CMakeLists.txt
@@ -116,6 +116,7 @@ set(MESOS_TESTS_SRC
   rate_limiting_tests.cpp
   resource_offers_tests.cpp
   resource_provider_http_api_tests.cpp
+  resource_provider_validation_tests.cpp
   resources_tests.cpp
   role_tests.cpp
   scheduler_driver_tests.cpp

http://git-wip-us.apache.org/repos/asf/mesos/blob/7528f595/src/tests/resource_provider_validation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_validation_tests.cpp b/src/tests/resource_provider_validation_tests.cpp
new file mode 100644
index 0000000..f182bff
--- /dev/null
+++ b/src/tests/resource_provider_validation_tests.cpp
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <gtest/gtest.h>
+
+#include <mesos/mesos.hpp>
+
+#include <mesos/resource_provider/resource_provider.hpp>
+
+#include <stout/error.hpp>
+#include <stout/gtest.hpp>
+#include <stout/option.hpp>
+#include <stout/uuid.hpp>
+
+#include "resource_provider/validation.hpp"
+
+namespace call = mesos::internal::resource_provider::validation::call;
+
+using mesos::resource_provider::Call;
+
+namespace mesos {
+namespace internal {
+namespace tests {
+
+TEST(ResourceProviderCallValidationTest, Subscribe)
+{
+  Call call;
+  call.set_type(Call::SUBSCRIBE);
+
+  // Expecting `Call::Subscribe`.
+  Option<Error> error = call::validate(call);
+  EXPECT_SOME(error);
+
+  Call::Subscribe* subscribe = call.mutable_subscribe();
+  ResourceProviderInfo* info = subscribe->mutable_resource_provider_info();
+  info->set_type("org.apache.mesos.rp.test");
+  info->set_name("test");
+
+  error = call::validate(call);
+  EXPECT_NONE(error);
+}
+
+
+TEST(ResourceProviderCallValidationTest, Update)
+{
+  Call call;
+  call.set_type(Call::UPDATE);
+
+  // Expecting a resource provider ID and `Call::Update`.
+  Option<Error> error = call::validate(call);
+  EXPECT_SOME(error);
+
+  ResourceProviderID* id = call.mutable_resource_provider_id();
+  id->set_value(UUID::random().toString());
+
+  // Still expecting `Call::Update`.
+  error = call::validate(call);
+  EXPECT_SOME(error);
+
+  Call::Update* update = call.mutable_update();
+  update->set_state(Call::Update::OK);
+  update->mutable_operation();
+
+  error = call::validate(call);
+  EXPECT_NONE(error);
+}
+
+} // namespace tests {
+} // namespace internal {
+} // namespace mesos {