You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by an...@apache.org on 2016/07/06 05:53:30 UTC
[1/7] mesos git commit: Implemented 'Subscribed' event for v1 master
event stream.
Repository: mesos
Updated Branches:
refs/heads/master d840f48f9 -> d66f0b1e6
Implemented 'Subscribed' event for v1 master event stream.
This change adds logic for sending a `Subscribed` event containing
the present master state when a client subscribes to the event stream.
Review: https://reviews.apache.org/r/49518/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d66f0b1e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d66f0b1e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d66f0b1e
Branch: refs/heads/master
Commit: d66f0b1e67f1fea50265d5026a1d89f7dcf85a7d
Parents: 3038809
Author: Zhitao Li <zh...@gmail.com>
Authored: Tue Jul 5 21:26:00 2016 -0700
Committer: Anand Mazumdar <an...@apache.org>
Committed: Tue Jul 5 22:32:57 2016 -0700
----------------------------------------------------------------------
include/mesos/master/master.proto | 17 +++++--
include/mesos/v1/master/master.proto | 17 +++++--
src/master/http.cpp | 39 +++++++++-----
src/tests/api_tests.cpp | 84 ++++++++++++++++++-------------
4 files changed, 102 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/d66f0b1e/include/mesos/master/master.proto
----------------------------------------------------------------------
diff --git a/include/mesos/master/master.proto b/include/mesos/master/master.proto
index f0c8a56..6aedc2f 100644
--- a/include/mesos/master/master.proto
+++ b/include/mesos/master/master.proto
@@ -435,12 +435,20 @@ message Response {
message Event {
enum Type {
UNKNOWN = 0;
- TASK_ADDED = 1; // See `TaskAdded` below.
- TASK_UPDATED = 2; // See `TaskUpdated` below.
+ SUBSCRIBED = 1; // See `Subscribed` below.
+ TASK_ADDED = 2; // See `TaskAdded` below.
+ TASK_UPDATED = 3; // See `TaskUpdated` below.
// TODO(vinod): Fill in more events.
}
+ // First event received when a client subscribes.
+ message Subscribed {
+ // Snapshot of the entire cluster state. Further updates to the
+ // cluster state are sent as separate events on the stream.
+ optional Response.GetState get_state = 1;
+ }
+
// Forwarded by the master when a task becomes known to it. This can happen
// when a new task is launched by the scheduler or when the task becomes
// known to the master upon an agent (re-)registration after a failover.
@@ -459,6 +467,7 @@ message Event {
optional Type type = 1;
- optional TaskAdded task_added = 2;
- optional TaskUpdated task_updated = 3;
+ optional Subscribed subscribed = 2;
+ optional TaskAdded task_added = 3;
+ optional TaskUpdated task_updated = 4;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/d66f0b1e/include/mesos/v1/master/master.proto
----------------------------------------------------------------------
diff --git a/include/mesos/v1/master/master.proto b/include/mesos/v1/master/master.proto
index 0c8cf15..19dbd1a 100644
--- a/include/mesos/v1/master/master.proto
+++ b/include/mesos/v1/master/master.proto
@@ -436,12 +436,20 @@ message Response {
message Event {
enum Type {
UNKNOWN = 0;
- TASK_ADDED = 1; // See `TaskAdded` below.
- TASK_UPDATED = 2; // See `TaskUpdated` below.
+ SUBSCRIBED = 1; // See `Subscribed` below.
+ TASK_ADDED = 2; // See `TaskAdded` below.
+ TASK_UPDATED = 3; // See `TaskUpdated` below.
// TODO(vinod): Fill in more events.
}
+ // First event received when a client subscribes.
+ message Subscribed {
+ // Snapshot of the entire cluster state. Further updates to the
+ // cluster state are sent as separate events on the stream.
+ optional Response.GetState get_state = 1;
+ }
+
// Forwarded by the master when a task becomes known to it. This can happen
// when a new task is launched by the scheduler or when the task becomes
// known to the master upon an agent (re-)registration after a failover.
@@ -460,6 +468,7 @@ message Event {
optional Type type = 1;
- optional TaskAdded task_added = 2;
- optional TaskUpdated task_updated = 3;
+ optional Subscribed subscribed = 2;
+ optional TaskAdded task_added = 3;
+ optional TaskUpdated task_updated = 4;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/d66f0b1e/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 10b0572..3640486 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -589,18 +589,33 @@ Future<Response> Master::Http::api(
return quotaHandler.remove(call, principal);
case mesos::master::Call::SUBSCRIBE: {
- Pipe pipe;
- OK ok;
-
- ok.headers["Content-Type"] = stringify(acceptType);
- ok.type = Response::PIPE;
- ok.reader = pipe.reader();
-
- HttpConnection http {pipe.writer(), acceptType, UUID::random()};
-
- master->subscribe(http);
-
- return ok;
+ return _getState(principal)
+ .then(defer(master->self(),
+ [this, acceptType]
+ (const mesos::master::Response::GetState& getState)
+ -> Future<Response> {
+ // TODO(zhitao): There is a possible race condition here: if an action
+ // like `taskUpdate()` is queued between `_getState()` and this
+ // continuation, neither the event will be sent to the subscriber
+ // (because the connection is not in subscribers yet), nor
+ // the effect of the change would be captured in the snapshot.
+ Pipe pipe;
+ OK ok;
+
+ ok.headers["Content-Type"] = stringify(acceptType);
+ ok.type = Response::PIPE;
+ ok.reader = pipe.reader();
+
+ HttpConnection http {pipe.writer(), acceptType, UUID::random()};
+ master->subscribe(http);
+
+ mesos::master::Event event;
+ event.set_type(mesos::master::Event::SUBSCRIBED);
+ event.mutable_subscribed()->mutable_get_state()->CopyFrom(getState);
+ http.send<mesos::master::Event, v1::master::Event>(event);
+
+ return ok;
+ }));
}
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/d66f0b1e/src/tests/api_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp
index 393afcf..7cf716d 100644
--- a/src/tests/api_tests.cpp
+++ b/src/tests/api_tests.cpp
@@ -1328,44 +1328,11 @@ TEST_P(MasterAPITest, StartAndStopMaintenance)
// endpoint is able to receive `TASK_ADDED`/`TASK_UPDATED` events.
TEST_P(MasterAPITest, Subscribe)
{
- Try<Owned<cluster::Master>> master = this->StartMaster();
- ASSERT_SOME(master);
-
- v1::master::Call v1Call;
- v1Call.set_type(v1::master::Call::SUBSCRIBE);
-
- process::http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
-
ContentType contentType = GetParam();
- headers["Accept"] = stringify(contentType);
-
- Future<Response> response = process::http::streaming::post(
- master.get()->pid,
- "api/v1",
- headers,
- serialize(contentType, v1Call),
- stringify(contentType));
-
- AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
- AWAIT_EXPECT_RESPONSE_HEADER_EQ("chunked", "Transfer-Encoding", response);
- ASSERT_EQ(Response::PIPE, response.get().type);
- ASSERT_SOME(response->reader);
-
- Pipe::Reader reader = response->reader.get();
-
- auto deserializer =
- lambda::bind(deserialize<v1::master::Event>, contentType, lambda::_1);
-
- Reader<v1::master::Event> decoder(
- Decoder<v1::master::Event>(deserializer), reader);
- Future<Result<v1::master::Event>> event = decoder.read();
-
- EXPECT_TRUE(event.isPending());
+ Try<Owned<cluster::Master>> master = this->StartMaster();
+ ASSERT_SOME(master);
- // Launch a task using the scheduler. This should result in a `TASK_ADDED`
- // event when the task is launched followed by a `TASK_UPDATED` event after
- // the task transitions to running state.
auto scheduler = std::make_shared<MockV1HTTPScheduler>();
auto executor = std::make_shared<MockV1HTTPExecutor>();
@@ -1407,11 +1374,58 @@ TEST_P(MasterAPITest, Subscribe)
AWAIT_READY(subscribed);
+ // Launch a task using the scheduler. This should result in a `TASK_ADDED`
+ // event when the task is launched followed by a `TASK_UPDATED` event after
+ // the task transitions to running state.
v1::FrameworkID frameworkId(subscribed->framework_id());
AWAIT_READY(offers);
EXPECT_NE(0, offers->offers().size());
+ // Create event stream after seeing first offer but before first task is
+ // launched. We should see one framework, one agent and zero task/executor.
+ v1::master::Call v1Call;
+ v1Call.set_type(v1::master::Call::SUBSCRIBE);
+
+ process::http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+
+ headers["Accept"] = stringify(contentType);
+
+ Future<Response> response = process::http::streaming::post(
+ master.get()->pid,
+ "api/v1",
+ headers,
+ serialize(contentType, v1Call),
+ stringify(contentType));
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+ AWAIT_EXPECT_RESPONSE_HEADER_EQ("chunked", "Transfer-Encoding", response);
+ ASSERT_EQ(Response::PIPE, response.get().type);
+ ASSERT_SOME(response->reader);
+
+ Pipe::Reader reader = response->reader.get();
+
+ auto deserializer =
+ lambda::bind(deserialize<v1::master::Event>, contentType, lambda::_1);
+
+ Reader<v1::master::Event> decoder(
+ Decoder<v1::master::Event>(deserializer), reader);
+
+ Future<Result<v1::master::Event>> event = decoder.read();
+ AWAIT_READY(event);
+
+ EXPECT_EQ(v1::master::Event::SUBSCRIBED, event.get().get().type());
+ const v1::master::Response::GetState& getState =
+ event.get().get().subscribed().get_state();
+
+ EXPECT_EQ(1u, getState.get_frameworks().frameworks_size());
+ EXPECT_EQ(1u, getState.get_agents().agents_size());
+ EXPECT_EQ(0u, getState.get_tasks().tasks_size());
+ EXPECT_EQ(0u, getState.get_executors().executors_size());
+
+ event = decoder.read();
+ EXPECT_TRUE(event.isPending());
+
const v1::Offer& offer = offers->offers(0);
TaskInfo task = createTask(internal::devolve(offer), "", executorId);
[3/7] mesos git commit: Refactored 'Master::Http::getAgents()' into
helper function.
Posted by an...@apache.org.
Refactored 'Master::Http::getAgents()' into helper function.
This helper function will be reused by both `GET_AGENTS`
and `GET_STATE` calls.
Review: https://reviews.apache.org/r/49488/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2517ce8f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2517ce8f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2517ce8f
Branch: refs/heads/master
Commit: 2517ce8f4ebf419589b9298b183149d221d9aecc
Parents: a318264
Author: Zhitao Li <zh...@gmail.com>
Authored: Tue Jul 5 21:25:43 2016 -0700
Committer: Anand Mazumdar <an...@apache.org>
Committed: Tue Jul 5 22:32:57 2016 -0700
----------------------------------------------------------------------
src/master/http.cpp | 24 ++++++++++++++++++------
src/master/master.hpp | 3 +++
2 files changed, 21 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/2517ce8f/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 9dee513..2341f15 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -2089,12 +2089,25 @@ Future<process::http::Response> Master::Http::getAgents(
{
CHECK_EQ(mesos::master::Call::GET_AGENTS, call.type());
- mesos::master::Response response;
- response.set_type(mesos::master::Response::GET_AGENTS);
+ return _getAgents(principal)
+ .then([contentType](const mesos::master::Response::GetAgents& getAgents)
+ -> Future<Response> {
+ mesos::master::Response response;
+ response.set_type(mesos::master::Response::GET_AGENTS);
+ response.mutable_get_agents()->CopyFrom(getAgents);
+
+ return OK(serialize(contentType, evolve(response)),
+ stringify(contentType));
+ });
+}
+
+Future<mesos::master::Response::GetAgents> Master::Http::_getAgents(
+ const Option<string>& principal) const
+{
+ mesos::master::Response::GetAgents getAgents;
foreachvalue (const Slave* slave, master->slaves.registered) {
- mesos::master::Response::GetAgents::Agent* agent =
- response.mutable_get_agents()->add_agents();
+ mesos::master::Response::GetAgents::Agent* agent = getAgents.add_agents();
agent->mutable_agent_info()->CopyFrom(slave->info);
@@ -2123,8 +2136,7 @@ Future<process::http::Response> Master::Http::getAgents(
}
}
- return OK(serialize(contentType, evolve(response)),
- stringify(contentType));
+ return getAgents;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/2517ce8f/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index b9d91c4..09e5105 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -1388,6 +1388,9 @@ private:
const Option<std::string>& principal,
ContentType contentType) const;
+ process::Future<mesos::master::Response::GetAgents> _getAgents(
+ const Option<std::string>& principal) const;
+
process::Future<process::http::Response> getFlags(
const mesos::master::Call& call,
const Option<std::string>& principal,
[2/7] mesos git commit: Refactored 'Master::Http::getExecutors()'
into helper function.
Posted by an...@apache.org.
Refactored 'Master::Http::getExecutors()' into helper function.
This helper function will be reused by `GetExecutors` and `GetState`.
Review: https://reviews.apache.org/r/49516/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/532f66a3
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/532f66a3
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/532f66a3
Branch: refs/heads/master
Commit: 532f66a3a9c98bdb2259091851707cb9e42da0ea
Parents: 3a988e2
Author: Zhitao Li <zh...@gmail.com>
Authored: Tue Jul 5 21:25:53 2016 -0700
Committer: Anand Mazumdar <an...@apache.org>
Committed: Tue Jul 5 22:32:57 2016 -0700
----------------------------------------------------------------------
src/master/http.cpp | 32 ++++++++++++++++++++++----------
src/master/master.hpp | 3 +++
2 files changed, 25 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/532f66a3/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 7020095..7085b07 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -1425,6 +1425,23 @@ Future<Response> Master::Http::getExecutors(
{
CHECK_EQ(mesos::master::Call::GET_EXECUTORS, call.type());
+ return _getExecutors(principal)
+ .then(
+ [contentType](const mesos::master::Response::GetExecutors& getExecutors)
+ -> Future<Response> {
+ mesos::master::Response response;
+ response.set_type(mesos::master::Response::GET_EXECUTORS);
+ response.mutable_get_executors()->CopyFrom(getExecutors);
+
+ return OK(serialize(contentType, evolve(response)),
+ stringify(contentType));
+ });
+}
+
+
+Future<mesos::master::Response::GetExecutors> Master::Http::_getExecutors(
+ const Option<std::string>& principal) const
+{
// Retrieve `ObjectApprover`s for authorizing frameworks and executors.
Future<Owned<ObjectApprover>> frameworksApprover;
Future<Owned<ObjectApprover>> executorsApprover;
@@ -1448,7 +1465,7 @@ Future<Response> Master::Http::getExecutors(
.then(defer(master->self(),
[=](const tuple<Owned<ObjectApprover>,
Owned<ObjectApprover>>& approvers)
- -> Response {
+ -> Future<mesos::master::Response::GetExecutors> {
// Get approver from tuple.
Owned<ObjectApprover> frameworksApprover;
Owned<ObjectApprover> executorsApprover;
@@ -1475,11 +1492,7 @@ Future<Response> Master::Http::getExecutors(
frameworks.push_back(framework.get());
}
- mesos::master::Response response;
- response.set_type(mesos::master::Response::GET_EXECUTORS);
-
- mesos::master::Response::GetExecutors* getExecutors =
- response.mutable_get_executors();
+ mesos::master::Response::GetExecutors getExecutors;
foreach (const Framework* framework, frameworks) {
foreachpair (const SlaveID& slaveId,
@@ -1494,7 +1507,7 @@ Future<Response> Master::Http::getExecutors(
}
mesos::master::Response::GetExecutors::Executor* executor =
- getExecutors->add_executors();
+ getExecutors.add_executors();
executor->mutable_executor_info()->CopyFrom(info);
executor->mutable_slave_id()->CopyFrom(slaveId);
@@ -1515,7 +1528,7 @@ Future<Response> Master::Http::getExecutors(
foreachvalue (const ExecutorInfo& info, executors) {
if (!master->frameworks.registered.contains(frameworkId)) {
mesos::master::Response::GetExecutors::Executor* executor =
- getExecutors->add_orphan_executors();
+ getExecutors.add_orphan_executors();
executor->mutable_executor_info()->CopyFrom(info);
executor->mutable_slave_id()->CopyFrom(slave->id);
@@ -1524,8 +1537,7 @@ Future<Response> Master::Http::getExecutors(
}
}
- return OK(serialize(contentType, evolve(response)),
- stringify(contentType));
+ return getExecutors;
}));
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/532f66a3/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 6f2a2b5..de64e3d 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -1497,6 +1497,9 @@ private:
const Option<std::string>& principal,
ContentType contentType) const;
+ process::Future<mesos::master::Response::GetExecutors> _getExecutors(
+ const Option<std::string>& principal) const;
+
Master* master;
// NOTE: The quota specific pieces of the Operator API are factored
[5/7] mesos git commit: Refactored 'master::Http::getFrameworks()' to
helper function.
Posted by an...@apache.org.
Refactored 'master::Http::getFrameworks()' to helper function.
This helper function will be reused by `GET_FRAMEWORKS` and
`GET_STATE` calls.
Review: https://reviews.apache.org/r/49489/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/dc73420f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/dc73420f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/dc73420f
Branch: refs/heads/master
Commit: dc73420f920d948f63e8abddf37d3136969c2c88
Parents: 2517ce8
Author: Zhitao Li <zh...@gmail.com>
Authored: Tue Jul 5 21:25:46 2016 -0700
Committer: Anand Mazumdar <an...@apache.org>
Committed: Tue Jul 5 22:32:57 2016 -0700
----------------------------------------------------------------------
src/master/http.cpp | 33 +++++++++++++++++++++++----------
src/master/master.hpp | 3 +++
2 files changed, 26 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/dc73420f/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 2341f15..7020095 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -1346,6 +1346,22 @@ Future<Response> Master::Http::getFrameworks(
{
CHECK_EQ(mesos::master::Call::GET_FRAMEWORKS, call.type());
+ return _getFrameworks(principal)
+ .then([contentType](
+ const mesos::master::Response::GetFrameworks& getFrameworks)
+ -> Future<Response> {
+ mesos::master::Response response;
+ response.set_type(mesos::master::Response::GET_FRAMEWORKS);
+ response.mutable_get_frameworks()->CopyFrom(getFrameworks);
+ return OK(serialize(contentType, evolve(response)),
+ stringify(contentType));
+ });
+}
+
+
+Future<mesos::master::Response::GetFrameworks> Master::Http::_getFrameworks(
+ const Option<string>& principal) const
+{
// Retrieve `ObjectApprover`s for authorizing frameworks.
Future<Owned<ObjectApprover>> frameworksApprover;
@@ -1364,9 +1380,9 @@ Future<Response> Master::Http::getFrameworks(
return frameworksApprover
.then(defer(master->self(),
- [=](const Owned<ObjectApprover>& frameworksApprover) -> Response {
- mesos::master::Response response;
- response.set_type(mesos::master::Response::GET_FRAMEWORKS);
+ [=](const Owned<ObjectApprover>& frameworksApprover)
+ -> Future<mesos::master::Response::GetFrameworks> {
+ mesos::master::Response::GetFrameworks getFrameworks;
foreachvalue (const Framework* framework,
master->frameworks.registered) {
@@ -1375,8 +1391,7 @@ Future<Response> Master::Http::getFrameworks(
continue;
}
- response.mutable_get_frameworks()->add_frameworks()
- ->CopyFrom(model(*framework));
+ getFrameworks.add_frameworks()->CopyFrom(model(*framework));
}
foreach (const std::shared_ptr<Framework>& framework,
@@ -1386,21 +1401,19 @@ Future<Response> Master::Http::getFrameworks(
continue;
}
- response.mutable_get_frameworks()->add_completed_frameworks()
- ->CopyFrom(model(*framework));
+ getFrameworks.add_completed_frameworks()->CopyFrom(model(*framework));
}
foreachvalue (const Slave* slave, master->slaves.registered) {
foreachkey (const FrameworkID& frameworkId, slave->tasks) {
if (!master->frameworks.registered.contains(frameworkId)) {
- response.mutable_get_frameworks()->add_unsubscribed_frameworks()
+ getFrameworks.add_unsubscribed_frameworks()
->set_value(frameworkId.value());
}
}
}
- return OK(serialize(contentType, evolve(response)),
- stringify(contentType));
+ return getFrameworks;
}));
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/dc73420f/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 09e5105..6f2a2b5 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -1489,6 +1489,9 @@ private:
const Option<std::string>& principal,
ContentType contentType) const;
+ process::Future<mesos::master::Response::GetFrameworks> _getFrameworks(
+ const Option<std::string>& principal) const;
+
process::Future<process::http::Response> getExecutors(
const mesos::master::Call& call,
const Option<std::string>& principal,
[6/7] mesos git commit: Implemented 'GetState' call in v1 master API.
Posted by an...@apache.org.
Implemented 'GetState' call in v1 master API.
Also created a helper function `_getState()` that will be used
for snapshot of event stream when a client subscribes.
Review: https://reviews.apache.org/r/49517/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3038809e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3038809e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3038809e
Branch: refs/heads/master
Commit: 3038809e38a3b8598e3c33c0e66d9db4552f0d29
Parents: 532f66a
Author: Zhitao Li <zh...@gmail.com>
Authored: Tue Jul 5 21:25:56 2016 -0700
Committer: Anand Mazumdar <an...@apache.org>
Committed: Tue Jul 5 22:32:57 2016 -0700
----------------------------------------------------------------------
src/master/http.cpp | 50 ++++++++++++++++-
src/master/master.hpp | 8 +++
src/tests/api_tests.cpp | 131 +++++++++++++++++++++++++++++++++++++++++++
3 files changed, 188 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/3038809e/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 7085b07..10b0572 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -523,7 +523,7 @@ Future<Response> Master::Http::api(
return NotImplemented();
case mesos::master::Call::GET_STATE:
- return NotImplemented();
+ return getState(call, principal, acceptType);
case mesos::master::Call::GET_STATE_SUMMARY:
return NotImplemented();
@@ -1542,6 +1542,54 @@ Future<mesos::master::Response::GetExecutors> Master::Http::_getExecutors(
}
+Future<Response> Master::Http::getState(
+ const mesos::master::Call& call,
+ const Option<string>& principal,
+ ContentType contentType) const
+{
+ CHECK_EQ(mesos::master::Call::GET_STATE, call.type());
+
+ return _getState(principal)
+ .then([contentType](const mesos::master::Response::GetState& getState)
+ -> Future<Response> {
+ mesos::master::Response response;
+ response.set_type(mesos::master::Response::GET_STATE);
+ response.mutable_get_state()->CopyFrom(getState);
+
+ return OK(serialize(contentType, evolve(response)),
+ stringify(contentType));
+ });
+}
+
+
+Future<mesos::master::Response::GetState> Master::Http::_getState(
+ const Option<string>& principal) const
+{
+ return collect(
+ _getTasks(principal),
+ _getExecutors(principal),
+ _getFrameworks(principal),
+ _getAgents(principal))
+ .then(defer(master->self(),
+ [](const tuple<mesos::master::Response::GetTasks,
+ mesos::master::Response::GetExecutors,
+ mesos::master::Response::GetFrameworks,
+ mesos::master::Response::GetAgents>& results)
+ -> mesos::master::Response::GetState {
+ mesos::master::Response::GetState getState;
+
+ // Use std::get instead of std::tie to avoid
+ // unnecessary copy of large data structs.
+ getState.mutable_get_tasks()->CopyFrom(std::get<0>(results));
+ getState.mutable_get_executors()->CopyFrom(std::get<1>(results));
+ getState.mutable_get_frameworks()->CopyFrom(std::get<2>(results));
+ getState.mutable_get_agents()->CopyFrom(std::get<3>(results));
+
+ return getState;
+ }));
+}
+
+
class Master::Http::FlagsError : public Error
{
public:
http://git-wip-us.apache.org/repos/asf/mesos/blob/3038809e/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index de64e3d..0688ba3 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -1500,6 +1500,14 @@ private:
process::Future<mesos::master::Response::GetExecutors> _getExecutors(
const Option<std::string>& principal) const;
+ process::Future<process::http::Response> getState(
+ const mesos::master::Call& call,
+ const Option<std::string>& principal,
+ ContentType contentType) const;
+
+ process::Future<mesos::master::Response::GetState> _getState(
+ const Option<std::string>& principal) const;
+
Master* master;
// NOTE: The quota specific pieces of the Operator API are factored
http://git-wip-us.apache.org/repos/asf/mesos/blob/3038809e/src/tests/api_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp
index e2d8bf5..393afcf 100644
--- a/src/tests/api_tests.cpp
+++ b/src/tests/api_tests.cpp
@@ -425,6 +425,137 @@ TEST_P(MasterAPITest, GetExecutors)
}
+TEST_P(MasterAPITest, GetState)
+{
+ v1::master::Call v1Call;
+ v1Call.set_type(v1::master::Call::GET_STATE);
+
+ master::Flags flags = CreateMasterFlags();
+
+ flags.hostname = "localhost";
+ flags.cluster = "test-cluster";
+
+ Try<Owned<cluster::Master>> master = StartMaster(flags);
+ ASSERT_SOME(master);
+
+ MockExecutor exec(DEFAULT_EXECUTOR_ID);
+ TestContainerizer containerizer(&exec);
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+ Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(&driver, _, _));
+
+ Future<vector<Offer>> offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ EXPECT_NE(0u, offers.get().size());
+
+ ContentType contentType = GetParam();
+ {
+ // GetState before task launch and check we have one framework, one agent
+ // and zero tasks/executors.
+ Future<v1::master::Response> v1Response =
+ post(master.get()->pid, v1Call, contentType);
+
+ AWAIT_READY(v1Response);
+ ASSERT_TRUE(v1Response->IsInitialized());
+ ASSERT_EQ(v1::master::Response::GET_STATE, v1Response->type());
+
+ const v1::master::Response::GetState& getState = v1Response->get_state();
+ ASSERT_EQ(1u, getState.get_frameworks().frameworks_size());
+ ASSERT_EQ(1u, getState.get_agents().agents_size());
+ ASSERT_EQ(0u, getState.get_tasks().tasks_size());
+ ASSERT_EQ(0u, getState.get_executors().executors_size());
+ }
+
+ TaskInfo task = createTask(offers.get()[0], "", DEFAULT_EXECUTOR_ID);
+
+ Future<ExecutorDriver*> execDriver;
+ EXPECT_CALL(exec, registered(_, _, _, _))
+ .WillOnce(FutureArg<0>(&execDriver));
+
+ EXPECT_CALL(exec, launchTask(_, _))
+ .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+ Future<TaskStatus> status;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status));
+
+ driver.launchTasks(offers.get()[0].id(), {task});
+
+ AWAIT_READY(execDriver);
+ AWAIT_READY(status);
+
+ EXPECT_EQ(TASK_RUNNING, status.get().state());
+
+ {
+ // GetState after task launch and check we have a running task.
+ Future<v1::master::Response> v1Response =
+ post(master.get()->pid, v1Call, contentType);
+
+ AWAIT_READY(v1Response);
+ ASSERT_TRUE(v1Response->IsInitialized());
+ ASSERT_EQ(v1::master::Response::GET_STATE, v1Response->type());
+
+ const v1::master::Response::GetState& getState = v1Response->get_state();
+ ASSERT_EQ(1u, getState.get_tasks().tasks_size());
+ ASSERT_EQ(0u, getState.get_tasks().completed_tasks_size());
+ }
+
+ Future<StatusUpdateAcknowledgementMessage> acknowledgement =
+ FUTURE_PROTOBUF(
+ StatusUpdateAcknowledgementMessage(),
+ _,
+ Eq(slave.get()->pid));
+
+ Future<TaskStatus> status2;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status2));
+
+ // Send a terminal update so that the task transitions to completed.
+ TaskStatus status3;
+ status3.mutable_task_id()->CopyFrom(task.task_id());
+ status3.set_state(TASK_FINISHED);
+
+ execDriver.get()->sendStatusUpdate(status3);
+
+ AWAIT_READY(status2);
+ EXPECT_EQ(TASK_FINISHED, status2.get().state());
+
+ AWAIT_READY(acknowledgement);
+
+ {
+ // GetState after task finished and check we have a completed task.
+ Future<v1::master::Response> v1Response =
+ post(master.get()->pid, v1Call, contentType);
+
+ AWAIT_READY(v1Response);
+ ASSERT_TRUE(v1Response->IsInitialized());
+ ASSERT_EQ(v1::master::Response::GET_STATE, v1Response->type());
+
+ const v1::master::Response::GetState& getState = v1Response->get_state();
+ ASSERT_EQ(1u, getState.get_tasks().completed_tasks_size());
+ ASSERT_EQ(0u, getState.get_tasks().tasks_size());
+ }
+
+ EXPECT_CALL(exec, shutdown(_))
+ .Times(AtMost(1));
+
+ driver.stop();
+ driver.join();
+}
+
+
TEST_P(MasterAPITest, GetTasksNoRunningTask)
{
Try<Owned<cluster::Master>> master = this->StartMaster();
[4/7] mesos git commit: Refactored 'Master::Http::getTasks()' into
helper function.
Posted by an...@apache.org.
Refactored 'Master::Http::getTasks()' into helper function.
This helper function will be also be reused for `GetState`.
Review: https://reviews.apache.org/r/49487/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a3182645
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a3182645
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a3182645
Branch: refs/heads/master
Commit: a3182645e01cf9240008f3b84c1142a552600954
Parents: d840f48
Author: Zhitao Li <zh...@gmail.com>
Authored: Tue Jul 5 21:25:39 2016 -0700
Committer: Anand Mazumdar <an...@apache.org>
Committed: Tue Jul 5 22:32:57 2016 -0700
----------------------------------------------------------------------
src/master/http.cpp | 35 +++++++++++++++++++++--------------
src/master/master.hpp | 3 +++
2 files changed, 24 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/a3182645/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index debedd4..9dee513 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -3381,6 +3381,20 @@ Future<Response> Master::Http::getTasks(
{
CHECK_EQ(mesos::master::Call::GET_TASKS, call.type());
+ return _getTasks(principal)
+ .then([contentType](const mesos::master::Response::GetTasks& getTasks)
+ -> Future<Response> {
+ mesos::master::Response response;
+ response.set_type(mesos::master::Response::GET_TASKS);
+ response.mutable_get_tasks()->CopyFrom(getTasks);
+ return OK(serialize(contentType, evolve(response)),
+ stringify(contentType));
+ });
+}
+
+
+Future<mesos::master::Response::GetTasks> Master::Http::_getTasks(
+ const Option<string>& principal) const {
// Retrieve Approvers for authorizing frameworks and tasks.
Future<Owned<ObjectApprover>> frameworksApprover;
Future<Owned<ObjectApprover>> tasksApprover;
@@ -3404,7 +3418,7 @@ Future<Response> Master::Http::getTasks(
.then(defer(master->self(),
[=](const tuple<Owned<ObjectApprover>,
Owned<ObjectApprover>>& approvers)
- -> Future<Response> {
+ -> Future<mesos::master::Response::GetTasks> {
// Get approver from tuple.
Owned<ObjectApprover> frameworksApprover;
Owned<ObjectApprover> tasksApprover;
@@ -3431,12 +3445,7 @@ Future<Response> Master::Http::getTasks(
frameworks.push_back(framework.get());
}
- mesos::master::Response response;
- response.set_type(mesos::master::Response::GET_TASKS);
-
- mesos::master::Response::GetTasks* getTasks =
- response.mutable_get_tasks();
-
+ mesos::master::Response::GetTasks getTasks;
vector<const Task*> tasks;
foreach (const Framework* framework, frameworks) {
// Pending tasks.
@@ -3449,7 +3458,7 @@ Future<Response> Master::Http::getTasks(
const Task& task =
protobuf::createTask(taskInfo, TASK_STAGING, framework->id());
- getTasks->add_pending_tasks()->CopyFrom(task);
+ getTasks.add_pending_tasks()->CopyFrom(task);
}
// Active tasks.
@@ -3460,7 +3469,7 @@ Future<Response> Master::Http::getTasks(
continue;
}
- getTasks->add_tasks()->CopyFrom(*task);
+ getTasks.add_tasks()->CopyFrom(*task);
}
// Completed tasks.
@@ -3470,7 +3479,7 @@ Future<Response> Master::Http::getTasks(
continue;
}
- getTasks->add_completed_tasks()->CopyFrom(*task);
+ getTasks.add_completed_tasks()->CopyFrom(*task);
}
}
@@ -3486,14 +3495,12 @@ Future<Response> Master::Http::getTasks(
CHECK_NOTNULL(task);
if (!master->frameworks.registered.contains(
task->framework_id())) {
- getTasks->add_orphan_tasks()->CopyFrom(*task);
+ getTasks.add_orphan_tasks()->CopyFrom(*task);
}
}
}
}
-
- return OK(serialize(contentType, evolve(response)),
- stringify(contentType));
+ return getTasks;
}));
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/a3182645/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index fbacd92..b9d91c4 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -1458,6 +1458,9 @@ private:
const Option<std::string>& principal,
ContentType contentType) const;
+ process::Future<mesos::master::Response::GetTasks> _getTasks(
+ const Option<std::string>& principal) const;
+
process::Future<process::http::Response> createVolumes(
const mesos::master::Call& call,
const Option<std::string>& principal,
[7/7] mesos git commit: Revised protobuf definition of 'GetState'
response.
Posted by an...@apache.org.
Revised protobuf definition of 'GetState' response.
Review: https://reviews.apache.org/r/49509/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3a988e2e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3a988e2e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3a988e2e
Branch: refs/heads/master
Commit: 3a988e2eac02f8f5590e1adced40d10b5360cabe
Parents: dc73420
Author: Zhitao Li <zh...@gmail.com>
Authored: Tue Jul 5 21:25:50 2016 -0700
Committer: Anand Mazumdar <an...@apache.org>
Committed: Tue Jul 5 22:32:57 2016 -0700
----------------------------------------------------------------------
include/mesos/master/master.proto | 7 ++++++-
include/mesos/v1/master/master.proto | 7 ++++++-
2 files changed, 12 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/3a988e2e/include/mesos/master/master.proto
----------------------------------------------------------------------
diff --git a/include/mesos/master/master.proto b/include/mesos/master/master.proto
index d06258e..f0c8a56 100644
--- a/include/mesos/master/master.proto
+++ b/include/mesos/master/master.proto
@@ -269,8 +269,13 @@ message Response {
repeated bytes data = 1;
}
+ // Contains full state of the master i.e. information about the tasks,
+ // agents, frameworks and executors running in the cluster.
message GetState {
- // TODO(vinod): Fill in the fields.
+ optional GetTasks get_tasks = 1;
+ optional GetExecutors get_executors = 2;
+ optional GetFrameworks get_frameworks = 3;
+ optional GetAgents get_agents = 4;
}
message GetStateSummary {
http://git-wip-us.apache.org/repos/asf/mesos/blob/3a988e2e/include/mesos/v1/master/master.proto
----------------------------------------------------------------------
diff --git a/include/mesos/v1/master/master.proto b/include/mesos/v1/master/master.proto
index b7cb6fd..0c8cf15 100644
--- a/include/mesos/v1/master/master.proto
+++ b/include/mesos/v1/master/master.proto
@@ -270,8 +270,13 @@ message Response {
repeated bytes data = 1;
}
+ // Contains full state of the master i.e. information about the tasks,
+ // agents, frameworks and executors running in the cluster.
message GetState {
- // TODO(vinod): Fill in the fields.
+ optional GetTasks get_tasks = 1;
+ optional GetExecutors get_executors = 2;
+ optional GetFrameworks get_frameworks = 3;
+ optional GetAgents get_agents = 4;
}
message GetStateSummary {