You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2017/07/27 00:15:30 UTC
[1/3] mesos git commit: Added evolve functions for resource provider
Event/Call.
Repository: mesos
Updated Branches:
refs/heads/master d0c8d43e1 -> 6827a4839
Added evolve functions for resource provider Event/Call.
Review: https://reviews.apache.org/r/60769/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/610ef401
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/610ef401
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/610ef401
Branch: refs/heads/master
Commit: 610ef401fead9a028108e3b1ef11544166a6c5f2
Parents: d0c8d43
Author: Jan Schlicht <ja...@mesosphere.io>
Authored: Wed Jul 26 17:04:44 2017 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed Jul 26 17:04:44 2017 -0700
----------------------------------------------------------------------
src/internal/evolve.cpp | 13 +++++++++++++
src/internal/evolve.hpp | 10 ++++++++++
2 files changed, 23 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/610ef401/src/internal/evolve.cpp
----------------------------------------------------------------------
diff --git a/src/internal/evolve.cpp b/src/internal/evolve.cpp
index 3ac55ac..cb1c0eb 100644
--- a/src/internal/evolve.cpp
+++ b/src/internal/evolve.cpp
@@ -231,6 +231,19 @@ v1::master::Response evolve(const mesos::master::Response& response)
}
+v1::resource_provider::Call evolve(const mesos::resource_provider::Call& call)
+{
+ return evolve<v1::resource_provider::Call>(call);
+}
+
+
+v1::resource_provider::Event evolve(
+ const mesos::resource_provider::Event& event)
+{
+ return evolve<v1::resource_provider::Event>(event);
+}
+
+
v1::scheduler::Call evolve(const scheduler::Call& call)
{
return evolve<v1::scheduler::Call>(call);
http://git-wip-us.apache.org/repos/asf/mesos/blob/610ef401/src/internal/evolve.hpp
----------------------------------------------------------------------
diff --git a/src/internal/evolve.hpp b/src/internal/evolve.hpp
index 42ead34..d796f32 100644
--- a/src/internal/evolve.hpp
+++ b/src/internal/evolve.hpp
@@ -32,6 +32,8 @@
#include <mesos/maintenance/maintenance.hpp>
+#include <mesos/resource_provider/resource_provider.hpp>
+
#include <mesos/scheduler/scheduler.hpp>
#include <mesos/v1/mesos.hpp>
@@ -43,6 +45,8 @@
#include <mesos/v1/master/master.hpp>
+#include <mesos/v1/resource_provider/resource_provider.hpp>
+
#include <mesos/v1/maintenance/maintenance.hpp>
#include <mesos/v1/scheduler/scheduler.hpp>
@@ -87,6 +91,12 @@ v1::maintenance::Schedule evolve(const maintenance::Schedule& schedule);
v1::master::Response evolve(const mesos::master::Response& response);
+
+v1::resource_provider::Call evolve(const mesos::resource_provider::Call& call);
+v1::resource_provider::Event evolve(
+ const mesos::resource_provider::Event& event);
+
+
v1::scheduler::Call evolve(const scheduler::Call& call);
[3/3] mesos git commit: Implemented the 'SUBSCRIBE' call in the
resource provider manager.
Posted by ji...@apache.org.
Implemented the 'SUBSCRIBE' call in the resource provider manager.
Using HTTP, resource providers are expected to subscribe to the resource
provider manager. The resource provider manager will assign a unique ID
to subscribed resource providers and use events to communicate with
them.
Review: https://reviews.apache.org/r/60771/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6827a483
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6827a483
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6827a483
Branch: refs/heads/master
Commit: 6827a48398a4db9306de2e23ed14f5abbb3780b2
Parents: 7528f59
Author: Jan Schlicht <ja...@mesosphere.io>
Authored: Wed Jul 26 17:04:51 2017 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed Jul 26 17:04:51 2017 -0700
----------------------------------------------------------------------
src/resource_provider/manager.cpp | 237 +++++++++++++++++++-
src/tests/resource_provider_http_api_tests.cpp | 218 ++++++++++++++++--
2 files changed, 437 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/6827a483/src/resource_provider/manager.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp
index a9bf258..44e1576 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -18,12 +18,38 @@
#include <glog/logging.h>
+#include <string>
+#include <utility>
+
+#include <mesos/http.hpp>
+
+#include <mesos/resource_provider/resource_provider.hpp>
+
+#include <mesos/v1/resource_provider/resource_provider.hpp>
+
#include <process/dispatch.hpp>
#include <process/id.hpp>
#include <process/process.hpp>
+#include <stout/hashmap.hpp>
+#include <stout/protobuf.hpp>
+#include <stout/uuid.hpp>
+
+#include "common/http.hpp"
+#include "common/recordio.hpp"
+
+#include "internal/devolve.hpp"
+#include "internal/evolve.hpp"
+
+#include "resource_provider/validation.hpp"
+
namespace http = process::http;
+using mesos::internal::resource_provider::validation::call::validate;
+
+using mesos::resource_provider::Call;
+using mesos::resource_provider::Event;
+
using process::Failure;
using process::Future;
using process::Owned;
@@ -36,31 +62,236 @@ using process::spawn;
using process::terminate;
using process::wait;
+using process::http::Accepted;
+using process::http::BadRequest;
+using process::http::OK;
+using process::http::MethodNotAllowed;
+using process::http::NotAcceptable;
+using process::http::NotImplemented;
+using process::http::Pipe;
+using process::http::UnsupportedMediaType;
+
using process::http::authentication::Principal;
+using std::string;
+
+
namespace mesos {
namespace internal {
+// Represents the streaming HTTP connection to a resource provider.
+struct HttpConnection
+{
+ HttpConnection(const http::Pipe::Writer& _writer,
+ ContentType _contentType)
+ : writer(_writer),
+ contentType(_contentType),
+ encoder(lambda::bind(serialize, contentType, lambda::_1)) {}
+
+ // Converts the message to an Event before sending.
+ template <typename Message>
+ bool send(const Message& message)
+ {
+ // We need to evolve the internal 'message' into a
+ // 'v1::resource_provider::Event'.
+ return writer.write(encoder.encode(evolve(message)));
+ }
+
+ bool close()
+ {
+ return writer.close();
+ }
+
+ Future<Nothing> closed() const
+ {
+ return writer.readerClosed();
+ }
+
+ http::Pipe::Writer writer;
+ ContentType contentType;
+ ::recordio::Encoder<v1::resource_provider::Event> encoder;
+};
+
+
+struct ResourceProvider
+{
+ ResourceProvider(
+ const ResourceProviderInfo& _info,
+ const HttpConnection& _http)
+ : info(_info), http(_http) {}
+
+ ResourceProviderInfo info;
+ HttpConnection http;
+};
+
+
class ResourceProviderManagerProcess
: public Process<ResourceProviderManagerProcess>
{
public:
- ResourceProviderManagerProcess()
- : ProcessBase(process::ID::generate("resource-provider-manager")) {}
+ ResourceProviderManagerProcess();
Future<http::Response> api(
const http::Request& request,
const Option<Principal>& principal);
Queue<ResourceProviderMessage> messages;
+
+ hashmap<ResourceProviderID, ResourceProvider> resourceProviders;
+
+private:
+ void subscribe(
+ const HttpConnection& http,
+ const Call::Subscribe& subscribe);
+
+ void update(
+ ResourceProvider* resourceProvider,
+ const Call::Update& update);
+
+ ResourceProviderID newResourceProviderId();
};
+ResourceProviderManagerProcess::ResourceProviderManagerProcess()
+ : ProcessBase(process::ID::generate("resource-provider-manager"))
+{
+}
+
+
Future<http::Response> ResourceProviderManagerProcess::api(
const http::Request& request,
const Option<Principal>& principal)
{
- return Failure("Unimplemented");
+ if (request.method != "POST") {
+ return MethodNotAllowed({"POST"}, request.method);
+ }
+
+ v1::resource_provider::Call v1Call;
+
+ // TODO(anand): Content type values are case-insensitive.
+ Option<string> contentType = request.headers.get("Content-Type");
+
+ if (contentType.isNone()) {
+ return BadRequest("Expecting 'Content-Type' to be present");
+ }
+
+ if (contentType.get() == APPLICATION_PROTOBUF) {
+ if (!v1Call.ParseFromString(request.body)) {
+ return BadRequest("Failed to parse body into Call protobuf");
+ }
+ } else if (contentType.get() == APPLICATION_JSON) {
+ Try<JSON::Value> value = JSON::parse(request.body);
+ if (value.isError()) {
+ return BadRequest("Failed to parse body into JSON: " + value.error());
+ }
+
+ Try<v1::resource_provider::Call> parse =
+ ::protobuf::parse<v1::resource_provider::Call>(value.get());
+
+ if (parse.isError()) {
+ return BadRequest("Failed to convert JSON into Call protobuf: " +
+ parse.error());
+ }
+
+ v1Call = parse.get();
+ } else {
+ return UnsupportedMediaType(
+ string("Expecting 'Content-Type' of ") +
+ APPLICATION_JSON + " or " + APPLICATION_PROTOBUF);
+ }
+
+ Call call = devolve(v1Call);
+
+ Option<Error> error = validate(call);
+ if (error.isSome()) {
+ return BadRequest(
+ "Failed to validate resource_provider::Call: " + error->message);
+ }
+
+ ContentType acceptType;
+ if (request.acceptsMediaType(APPLICATION_JSON)) {
+ acceptType = ContentType::JSON;
+ } else if (request.acceptsMediaType(APPLICATION_PROTOBUF)) {
+ acceptType = ContentType::PROTOBUF;
+ } else {
+ return NotAcceptable(
+ string("Expecting 'Accept' to allow ") +
+ "'" + APPLICATION_PROTOBUF + "' or '" + APPLICATION_JSON + "'");
+ }
+
+ switch(call.type()) {
+ case Call::UNKNOWN: {
+ return NotImplemented();
+ }
+
+ case Call::SUBSCRIBE: {
+ Pipe pipe;
+ OK ok;
+
+ ok.headers["Content-Type"] = stringify(acceptType);
+ ok.type = http::Response::PIPE;
+ ok.reader = pipe.reader();
+
+ HttpConnection http(pipe.writer(), acceptType);
+ subscribe(http, call.subscribe());
+
+ return ok;
+ }
+
+ case Call::UPDATE: {
+ if (!resourceProviders.contains(call.resource_provider_id())) {
+ return BadRequest("Resource provider cannot be found");
+ }
+
+ auto resourceProvider = resourceProviders.at(call.resource_provider_id());
+
+ update(&resourceProvider, call.update());
+ return Accepted();
+ }
+ }
+
+ UNREACHABLE();
+}
+
+
+void ResourceProviderManagerProcess::subscribe(
+ const HttpConnection& http,
+ const Call::Subscribe& subscribe)
+{
+ ResourceProviderInfo resourceProviderInfo =
+ subscribe.resource_provider_info();
+ resourceProviderInfo.mutable_id()->CopyFrom(newResourceProviderId());
+
+ ResourceProvider resourceProvider(resourceProviderInfo, http);
+
+ Event event;
+ event.set_type(Event::SUBSCRIBED);
+ event.mutable_subscribed()->mutable_provider_id()->CopyFrom(
+ resourceProvider.info.id());
+
+ if (!resourceProvider.http.send(event)) {
+ LOG(WARNING) << "Unable to send event to resource provider "
+ << stringify(resourceProvider.info.id())
+ << ": connection closed";
+ }
+
+ resourceProviders.put(resourceProviderInfo.id(), std::move(resourceProvider));
+}
+
+
+void ResourceProviderManagerProcess::update(
+ ResourceProvider* resourceProvider,
+ const Call::Update& update)
+{
+ // TODO(nfnt): Implement the 'UPDATE' call handler.
+}
+
+
+ResourceProviderID ResourceProviderManagerProcess::newResourceProviderId()
+{
+ ResourceProviderID resourceProviderId;
+ resourceProviderId.set_value(UUID::random().toString());
+ return resourceProviderId;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/6827a483/src/tests/resource_provider_http_api_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_http_api_tests.cpp b/src/tests/resource_provider_http_api_tests.cpp
index 46a3f37..85906ea 100644
--- a/src/tests/resource_provider_http_api_tests.cpp
+++ b/src/tests/resource_provider_http_api_tests.cpp
@@ -14,37 +14,50 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+#include <string>
+
+#include <mesos/v1/resource_provider/resource_provider.hpp>
+
#include <process/clock.hpp>
#include <process/gmock.hpp>
#include <process/http.hpp>
+#include <stout/lambda.hpp>
#include <stout/protobuf.hpp>
+#include <stout/recordio.hpp>
#include <stout/stringify.hpp>
-#include <mesos/v1/resource_provider/resource_provider.hpp>
+#include "common/http.hpp"
+#include "common/recordio.hpp"
#include "slave/slave.hpp"
+#include "resource_provider/manager.hpp"
+
#include "tests/mesos.hpp"
namespace http = process::http;
+using mesos::internal::slave::Slave;
+
+using mesos::master::detector::MasterDetector;
+
+using mesos::v1::resource_provider::Call;
+using mesos::v1::resource_provider::Event;
+
using process::Clock;
using process::Future;
using process::Owned;
-using process::http::InternalServerError;
+using process::http::BadRequest;
+using process::http::OK;
+using process::http::UnsupportedMediaType;
+
+using std::string;
using testing::Values;
using testing::WithParamInterface;
-using mesos::internal::slave::Slave;
-
-using mesos::master::detector::MasterDetector;
-
-using mesos::v1::resource_provider::Call;
-using mesos::v1::resource_provider::Event;
-
namespace mesos {
namespace internal {
namespace tests {
@@ -61,6 +74,164 @@ INSTANTIATE_TEST_CASE_P(
Values(ContentType::PROTOBUF, ContentType::JSON));
+TEST_F(ResourceProviderHttpApiTest, NoContentType)
+{
+ http::Request request;
+ request.method = "POST";
+ request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+
+ ResourceProviderManager manager;
+
+ Future<http::Response> response = manager.api(request, None());
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, response);
+ AWAIT_EXPECT_RESPONSE_BODY_EQ(
+ "Expecting 'Content-Type' to be present",
+ response);
+}
+
+
+// This test sends a valid JSON blob that cannot be deserialized
+// into a valid protobuf resulting in a BadRequest.
+TEST_F(ResourceProviderHttpApiTest, ValidJsonButInvalidProtobuf)
+{
+ JSON::Object object;
+ object.values["string"] = "valid_json";
+
+ http::Request request;
+ request.method = "POST";
+ request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+ request.headers["Accept"] = APPLICATION_JSON;
+ request.headers["Content-Type"] = APPLICATION_JSON;
+ request.body = stringify(object);
+
+ ResourceProviderManager manager;
+
+ Future<http::Response> response = manager.api(request, None());
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, response);
+ AWAIT_EXPECT_RESPONSE_BODY_EQ(
+ "Failed to validate resource_provider::Call: "
+ "Expecting 'type' to be present",
+ response);
+}
+
+
+TEST_P(ResourceProviderHttpApiTest, MalformedContent)
+{
+ const ContentType contentType = GetParam();
+
+ http::Request request;
+ request.method = "POST";
+ request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+ request.headers["Accept"] = stringify(contentType);
+ request.headers["Content-Type"] = stringify(contentType);
+ request.body = "MALFORMED_CONTENT";
+
+ ResourceProviderManager manager;
+
+ Future<http::Response> response = manager.api(request, None());
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, response);
+ switch (contentType) {
+ case ContentType::PROTOBUF:
+ AWAIT_EXPECT_RESPONSE_BODY_EQ(
+ "Failed to parse body into Call protobuf",
+ response);
+ break;
+ case ContentType::JSON:
+ AWAIT_EXPECT_RESPONSE_BODY_EQ(
+ "Failed to parse body into JSON: "
+ "syntax error at line 1 near: MALFORMED_CONTENT",
+ response);
+ break;
+ case ContentType::RECORDIO:
+ break;
+ }
+}
+
+
+TEST_P(ResourceProviderHttpApiTest, UnsupportedContentMediaType)
+{
+ Call call;
+ call.set_type(Call::SUBSCRIBE);
+
+ Call::Subscribe* subscribe = call.mutable_subscribe();
+
+ mesos::v1::ResourceProviderInfo* info =
+ subscribe->mutable_resource_provider_info();
+
+ info->set_type("org.apache.mesos.rp.test");
+ info->set_name("test");
+
+ const ContentType contentType = GetParam();
+ const string unknownMediaType = "application/unknown-media-type";
+
+ http::Request request;
+ request.method = "POST";
+ request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+ request.headers["Accept"] = stringify(contentType);
+ request.headers["Content-Type"] = unknownMediaType;
+ request.body = serialize(contentType, call);
+
+ ResourceProviderManager manager;
+
+ Future<http::Response> response = manager.api(request, None());
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(UnsupportedMediaType().status, response)
+ << response->body;
+}
+
+
+TEST_P(ResourceProviderHttpApiTest, Subscribe)
+{
+ Call call;
+ call.set_type(Call::SUBSCRIBE);
+
+ Call::Subscribe* subscribe = call.mutable_subscribe();
+
+ mesos::v1::ResourceProviderInfo* info =
+ subscribe->mutable_resource_provider_info();
+
+ info->set_type("org.apache.mesos.rp.test");
+ info->set_name("test");
+
+ const ContentType contentType = GetParam();
+
+ http::Request request;
+ request.method = "POST";
+ request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+ request.headers["Accept"] = stringify(contentType);
+ request.headers["Content-Type"] = stringify(contentType);
+ request.body = serialize(contentType, call);
+
+ ResourceProviderManager manager;
+
+ Future<http::Response> response = manager.api(request, None());
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response) << response->body;
+ ASSERT_EQ(http::Response::PIPE, response->type);
+
+ Option<http::Pipe::Reader> reader = response->reader;
+ ASSERT_SOME(reader);
+
+ recordio::Reader<Event> responseDecoder(
+ ::recordio::Decoder<Event>(
+ lambda::bind(deserialize<Event>, contentType, lambda::_1)),
+ reader.get());
+
+ Future<Result<Event>> event = responseDecoder.read();
+ AWAIT_READY(event);
+ ASSERT_SOME(event.get());
+
+ // Check event type is subscribed and the resource provider id is set.
+ ASSERT_EQ(Event::SUBSCRIBED, event->get().type());
+ ASSERT_NE("", event->get().subscribed().provider_id().value());
+}
+
+
+// This test starts an agent and connects directly with its resource
+// provider endpoint.
TEST_P(ResourceProviderHttpApiTest, AgentEndpoint)
{
Try<Owned<cluster::Master>> master = StartMaster();
@@ -70,8 +241,8 @@ TEST_P(ResourceProviderHttpApiTest, AgentEndpoint)
Owned<MasterDetector> detector = master.get()->createDetector();
- Try<Owned<cluster::Slave>> slave = StartSlave(detector.get());
- ASSERT_SOME(slave);
+ Try<Owned<cluster::Slave>> agent = StartSlave(detector.get());
+ ASSERT_SOME(agent);
AWAIT_READY(__recover);
@@ -95,15 +266,32 @@ TEST_P(ResourceProviderHttpApiTest, AgentEndpoint)
http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
headers["Accept"] = stringify(contentType);
- Future<http::Response> response = http::post(
- slave.get()->pid,
+ Future<http::Response> response = http::streaming::post(
+ agent.get()->pid,
"api/v1/resource_provider",
headers,
serialize(contentType, call),
stringify(contentType));
- // TODO(jieyu): Adjust the expectation once we add implementation.
- AWAIT_EXPECT_RESPONSE_STATUS_EQ(InternalServerError().status, response);
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+ AWAIT_EXPECT_RESPONSE_HEADER_EQ("chunked", "Transfer-Encoding", response);
+ ASSERT_EQ(http::Response::PIPE, response->type);
+
+ Option<http::Pipe::Reader> reader = response->reader;
+ ASSERT_SOME(reader);
+
+ recordio::Reader<Event> responseDecoder(
+ ::recordio::Decoder<Event>(
+ lambda::bind(deserialize<Event>, contentType, lambda::_1)),
+ reader.get());
+
+ Future<Result<Event>> event = responseDecoder.read();
+ AWAIT_READY(event);
+ ASSERT_SOME(event.get());
+
+ // Check event type is subscribed and the resource provider id is set.
+ ASSERT_EQ(Event::SUBSCRIBED, event->get().type());
+ ASSERT_NE("", event->get().subscribed().provider_id().value());
}
} // namespace tests {
[2/3] mesos git commit: Added validation functions for resource
provider calls.
Posted by ji...@apache.org.
Added validation functions for resource provider calls.
Review: https://reviews.apache.org/r/60770/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7528f595
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7528f595
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7528f595
Branch: refs/heads/master
Commit: 7528f595534c9c182ee82468a3e0e8520449283f
Parents: 610ef40
Author: Jan Schlicht <ja...@mesosphere.io>
Authored: Wed Jul 26 17:04:48 2017 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed Jul 26 17:04:48 2017 -0700
----------------------------------------------------------------------
src/CMakeLists.txt | 1 +
src/Makefile.am | 3 +
src/internal/devolve.cpp | 9 ++-
src/internal/devolve.hpp | 5 +-
src/resource_provider/validation.cpp | 73 +++++++++++++++++
src/resource_provider/validation.hpp | 39 +++++++++
src/tests/CMakeLists.txt | 1 +
.../resource_provider_validation_tests.cpp | 83 ++++++++++++++++++++
8 files changed, 208 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/7528f595/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 51b6742..98ccaf4 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -403,6 +403,7 @@ set(RESOURCE_PROVIDER_SRC
resource_provider/driver.cpp
resource_provider/local.cpp
resource_provider/manager.cpp
+ resource_provider/validation.cpp
resource_provider/storage/provider.cpp
)
http://git-wip-us.apache.org/repos/asf/mesos/blob/7528f595/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 8b91716..5712bad 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -956,6 +956,7 @@ libmesos_no_3rdparty_la_SOURCES += \
resource_provider/driver.cpp \
resource_provider/local.cpp \
resource_provider/manager.cpp \
+ resource_provider/validation.cpp \
resource_provider/storage/provider.cpp \
sched/sched.cpp \
scheduler/scheduler.cpp \
@@ -1094,6 +1095,7 @@ libmesos_no_3rdparty_la_SOURCES += \
resource_provider/local.hpp \
resource_provider/manager.hpp \
resource_provider/message.hpp \
+ resource_provider/validation.hpp \
resource_provider/storage/provider.hpp \
sched/constants.hpp \
sched/flags.hpp \
@@ -2357,6 +2359,7 @@ mesos_tests_SOURCES = \
tests/reservation_tests.cpp \
tests/resource_offers_tests.cpp \
tests/resource_provider_http_api_tests.cpp \
+ tests/resource_provider_validation_tests.cpp \
tests/resources_tests.cpp \
tests/resources_utils.cpp \
tests/role_tests.cpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/7528f595/src/internal/devolve.cpp
----------------------------------------------------------------------
diff --git a/src/internal/devolve.cpp b/src/internal/devolve.cpp
index 79c668f..7eb58e9 100644
--- a/src/internal/devolve.cpp
+++ b/src/internal/devolve.cpp
@@ -166,15 +166,16 @@ executor::Event devolve(const v1::executor::Event& event)
}
-resource_provider::Call devolve(const v1::resource_provider::Call& call)
+mesos::resource_provider::Call devolve(const v1::resource_provider::Call& call)
{
- return devolve<resource_provider::Call>(call);
+ return devolve<mesos::resource_provider::Call>(call);
}
-resource_provider::Event devolve(const v1::resource_provider::Event& event)
+mesos::resource_provider::Event devolve(
+ const v1::resource_provider::Event& event)
{
- return devolve<resource_provider::Event>(event);
+ return devolve<mesos::resource_provider::Event>(event);
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/7528f595/src/internal/devolve.hpp
----------------------------------------------------------------------
diff --git a/src/internal/devolve.hpp b/src/internal/devolve.hpp
index 214fcef..656173d 100644
--- a/src/internal/devolve.hpp
+++ b/src/internal/devolve.hpp
@@ -67,8 +67,9 @@ SlaveInfo devolve(const v1::AgentInfo& agentInfo);
TaskID devolve(const v1::TaskID& taskId);
TaskStatus devolve(const v1::TaskStatus& status);
-resource_provider::Call devolve(const v1::resource_provider::Call& call);
-resource_provider::Event devolve(const v1::resource_provider::Event& event);
+mesos::resource_provider::Call devolve(const v1::resource_provider::Call& call);
+mesos::resource_provider::Event devolve(
+ const v1::resource_provider::Event& event);
scheduler::Call devolve(const v1::scheduler::Call& call);
scheduler::Event devolve(const v1::scheduler::Event& event);
http://git-wip-us.apache.org/repos/asf/mesos/blob/7528f595/src/resource_provider/validation.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/validation.cpp b/src/resource_provider/validation.cpp
new file mode 100644
index 0000000..d292722
--- /dev/null
+++ b/src/resource_provider/validation.cpp
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "resource_provider/validation.hpp"
+
+#include <stout/none.hpp>
+#include <stout/unreachable.hpp>
+
+using mesos::resource_provider::Call;
+
+namespace mesos {
+namespace internal {
+namespace resource_provider {
+namespace validation {
+namespace call {
+
+Option<Error> validate(const Call& call)
+{
+ if (!call.IsInitialized()) {
+ return Error("Not initialized: " + call.InitializationErrorString());
+ }
+
+ if (!call.has_type()) {
+ return Error("Expecting 'type' to be present");
+ }
+
+ switch(call.type()) {
+ case Call::UNKNOWN: {
+ return None();
+ }
+
+ case Call::SUBSCRIBE: {
+ if (!call.has_subscribe()) {
+ return Error("Expecting 'subscribe' to be present");
+ }
+
+ return None();
+ }
+
+ case Call::UPDATE: {
+ if (!call.has_resource_provider_id()) {
+ return Error("Expecting 'resource_provider_id' to be present");
+ }
+
+ if (!call.has_update()) {
+ return Error("Expecting 'update' to be present");
+ }
+
+ return None();
+ }
+ }
+
+ UNREACHABLE();
+}
+
+} // namespace call {
+} // namespace validation {
+} // namespace resource_provider {
+} // namespace internal {
+} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/7528f595/src/resource_provider/validation.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/validation.hpp b/src/resource_provider/validation.hpp
new file mode 100644
index 0000000..0068f56
--- /dev/null
+++ b/src/resource_provider/validation.hpp
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef __RESOURCE_PROVIDER_VALIDATION_HPP__
+#define __RESOURCE_PROVIDER_VALIDATION_HPP__
+
+#include <mesos/resource_provider/resource_provider.hpp>
+
+#include <stout/error.hpp>
+#include <stout/option.hpp>
+
+namespace mesos {
+namespace internal {
+namespace resource_provider {
+namespace validation {
+namespace call {
+
+Option<Error> validate(const mesos::resource_provider::Call& call);
+
+} // namespace call {
+} // namespace validation {
+} // namespace resource_provider {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __RESOURCE_PROVIDER_VALIDATION_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/7528f595/src/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt
index f5fe5a6..6dd2716 100644
--- a/src/tests/CMakeLists.txt
+++ b/src/tests/CMakeLists.txt
@@ -116,6 +116,7 @@ set(MESOS_TESTS_SRC
rate_limiting_tests.cpp
resource_offers_tests.cpp
resource_provider_http_api_tests.cpp
+ resource_provider_validation_tests.cpp
resources_tests.cpp
role_tests.cpp
scheduler_driver_tests.cpp
http://git-wip-us.apache.org/repos/asf/mesos/blob/7528f595/src/tests/resource_provider_validation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_validation_tests.cpp b/src/tests/resource_provider_validation_tests.cpp
new file mode 100644
index 0000000..f182bff
--- /dev/null
+++ b/src/tests/resource_provider_validation_tests.cpp
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <gtest/gtest.h>
+
+#include <mesos/mesos.hpp>
+
+#include <mesos/resource_provider/resource_provider.hpp>
+
+#include <stout/error.hpp>
+#include <stout/gtest.hpp>
+#include <stout/option.hpp>
+#include <stout/uuid.hpp>
+
+#include "resource_provider/validation.hpp"
+
+namespace call = mesos::internal::resource_provider::validation::call;
+
+using mesos::resource_provider::Call;
+
+namespace mesos {
+namespace internal {
+namespace tests {
+
+TEST(ResourceProviderCallValidationTest, Subscribe)
+{
+ Call call;
+ call.set_type(Call::SUBSCRIBE);
+
+ // Expecting `Call::Subscribe`.
+ Option<Error> error = call::validate(call);
+ EXPECT_SOME(error);
+
+ Call::Subscribe* subscribe = call.mutable_subscribe();
+ ResourceProviderInfo* info = subscribe->mutable_resource_provider_info();
+ info->set_type("org.apache.mesos.rp.test");
+ info->set_name("test");
+
+ error = call::validate(call);
+ EXPECT_NONE(error);
+}
+
+
+TEST(ResourceProviderCallValidationTest, Update)
+{
+ Call call;
+ call.set_type(Call::UPDATE);
+
+ // Expecting a resource provider ID and `Call::Update`.
+ Option<Error> error = call::validate(call);
+ EXPECT_SOME(error);
+
+ ResourceProviderID* id = call.mutable_resource_provider_id();
+ id->set_value(UUID::random().toString());
+
+ // Still expecting `Call::Update`.
+ error = call::validate(call);
+ EXPECT_SOME(error);
+
+ Call::Update* update = call.mutable_update();
+ update->set_state(Call::Update::OK);
+ update->mutable_operation();
+
+ error = call::validate(call);
+ EXPECT_NONE(error);
+}
+
+} // namespace tests {
+} // namespace internal {
+} // namespace mesos {