You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by an...@apache.org on 2016/07/06 05:53:35 UTC
[6/7] mesos git commit: Implemented 'GetState' call in v1 master API.
Implemented 'GetState' call in v1 master API.
Also created a helper function `_getState()` that will be used
for snapshot of event stream when a client subscribes.
Review: https://reviews.apache.org/r/49517/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3038809e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3038809e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3038809e
Branch: refs/heads/master
Commit: 3038809e38a3b8598e3c33c0e66d9db4552f0d29
Parents: 532f66a
Author: Zhitao Li <zh...@gmail.com>
Authored: Tue Jul 5 21:25:56 2016 -0700
Committer: Anand Mazumdar <an...@apache.org>
Committed: Tue Jul 5 22:32:57 2016 -0700
----------------------------------------------------------------------
src/master/http.cpp | 50 ++++++++++++++++-
src/master/master.hpp | 8 +++
src/tests/api_tests.cpp | 131 +++++++++++++++++++++++++++++++++++++++++++
3 files changed, 188 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/3038809e/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 7085b07..10b0572 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -523,7 +523,7 @@ Future<Response> Master::Http::api(
return NotImplemented();
case mesos::master::Call::GET_STATE:
- return NotImplemented();
+ return getState(call, principal, acceptType);
case mesos::master::Call::GET_STATE_SUMMARY:
return NotImplemented();
@@ -1542,6 +1542,54 @@ Future<mesos::master::Response::GetExecutors> Master::Http::_getExecutors(
}
+Future<Response> Master::Http::getState(
+ const mesos::master::Call& call,
+ const Option<string>& principal,
+ ContentType contentType) const
+{
+ CHECK_EQ(mesos::master::Call::GET_STATE, call.type());
+
+ return _getState(principal)
+ .then([contentType](const mesos::master::Response::GetState& getState)
+ -> Future<Response> {
+ mesos::master::Response response;
+ response.set_type(mesos::master::Response::GET_STATE);
+ response.mutable_get_state()->CopyFrom(getState);
+
+ return OK(serialize(contentType, evolve(response)),
+ stringify(contentType));
+ });
+}
+
+
+Future<mesos::master::Response::GetState> Master::Http::_getState(
+ const Option<string>& principal) const
+{
+ return collect(
+ _getTasks(principal),
+ _getExecutors(principal),
+ _getFrameworks(principal),
+ _getAgents(principal))
+ .then(defer(master->self(),
+ [](const tuple<mesos::master::Response::GetTasks,
+ mesos::master::Response::GetExecutors,
+ mesos::master::Response::GetFrameworks,
+ mesos::master::Response::GetAgents>& results)
+ -> mesos::master::Response::GetState {
+ mesos::master::Response::GetState getState;
+
+ // Use std::get instead of std::tie to avoid
+ // unnecessary copy of large data structs.
+ getState.mutable_get_tasks()->CopyFrom(std::get<0>(results));
+ getState.mutable_get_executors()->CopyFrom(std::get<1>(results));
+ getState.mutable_get_frameworks()->CopyFrom(std::get<2>(results));
+ getState.mutable_get_agents()->CopyFrom(std::get<3>(results));
+
+ return getState;
+ }));
+}
+
+
class Master::Http::FlagsError : public Error
{
public:
http://git-wip-us.apache.org/repos/asf/mesos/blob/3038809e/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index de64e3d..0688ba3 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -1500,6 +1500,14 @@ private:
process::Future<mesos::master::Response::GetExecutors> _getExecutors(
const Option<std::string>& principal) const;
+ process::Future<process::http::Response> getState(
+ const mesos::master::Call& call,
+ const Option<std::string>& principal,
+ ContentType contentType) const;
+
+ process::Future<mesos::master::Response::GetState> _getState(
+ const Option<std::string>& principal) const;
+
Master* master;
// NOTE: The quota specific pieces of the Operator API are factored
http://git-wip-us.apache.org/repos/asf/mesos/blob/3038809e/src/tests/api_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp
index e2d8bf5..393afcf 100644
--- a/src/tests/api_tests.cpp
+++ b/src/tests/api_tests.cpp
@@ -425,6 +425,137 @@ TEST_P(MasterAPITest, GetExecutors)
}
+TEST_P(MasterAPITest, GetState)
+{
+ v1::master::Call v1Call;
+ v1Call.set_type(v1::master::Call::GET_STATE);
+
+ master::Flags flags = CreateMasterFlags();
+
+ flags.hostname = "localhost";
+ flags.cluster = "test-cluster";
+
+ Try<Owned<cluster::Master>> master = StartMaster(flags);
+ ASSERT_SOME(master);
+
+ MockExecutor exec(DEFAULT_EXECUTOR_ID);
+ TestContainerizer containerizer(&exec);
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+ Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(&driver, _, _));
+
+ Future<vector<Offer>> offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ EXPECT_NE(0u, offers.get().size());
+
+ ContentType contentType = GetParam();
+ {
+ // GetState before task launch and check we have one framework, one agent
+ // and zero tasks/executors.
+ Future<v1::master::Response> v1Response =
+ post(master.get()->pid, v1Call, contentType);
+
+ AWAIT_READY(v1Response);
+ ASSERT_TRUE(v1Response->IsInitialized());
+ ASSERT_EQ(v1::master::Response::GET_STATE, v1Response->type());
+
+ const v1::master::Response::GetState& getState = v1Response->get_state();
+ ASSERT_EQ(1u, getState.get_frameworks().frameworks_size());
+ ASSERT_EQ(1u, getState.get_agents().agents_size());
+ ASSERT_EQ(0u, getState.get_tasks().tasks_size());
+ ASSERT_EQ(0u, getState.get_executors().executors_size());
+ }
+
+ TaskInfo task = createTask(offers.get()[0], "", DEFAULT_EXECUTOR_ID);
+
+ Future<ExecutorDriver*> execDriver;
+ EXPECT_CALL(exec, registered(_, _, _, _))
+ .WillOnce(FutureArg<0>(&execDriver));
+
+ EXPECT_CALL(exec, launchTask(_, _))
+ .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+ Future<TaskStatus> status;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status));
+
+ driver.launchTasks(offers.get()[0].id(), {task});
+
+ AWAIT_READY(execDriver);
+ AWAIT_READY(status);
+
+ EXPECT_EQ(TASK_RUNNING, status.get().state());
+
+ {
+ // GetState after task launch and check we have a running task.
+ Future<v1::master::Response> v1Response =
+ post(master.get()->pid, v1Call, contentType);
+
+ AWAIT_READY(v1Response);
+ ASSERT_TRUE(v1Response->IsInitialized());
+ ASSERT_EQ(v1::master::Response::GET_STATE, v1Response->type());
+
+ const v1::master::Response::GetState& getState = v1Response->get_state();
+ ASSERT_EQ(1u, getState.get_tasks().tasks_size());
+ ASSERT_EQ(0u, getState.get_tasks().completed_tasks_size());
+ }
+
+ Future<StatusUpdateAcknowledgementMessage> acknowledgement =
+ FUTURE_PROTOBUF(
+ StatusUpdateAcknowledgementMessage(),
+ _,
+ Eq(slave.get()->pid));
+
+ Future<TaskStatus> status2;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status2));
+
+ // Send a terminal update so that the task transitions to completed.
+ TaskStatus status3;
+ status3.mutable_task_id()->CopyFrom(task.task_id());
+ status3.set_state(TASK_FINISHED);
+
+ execDriver.get()->sendStatusUpdate(status3);
+
+ AWAIT_READY(status2);
+ EXPECT_EQ(TASK_FINISHED, status2.get().state());
+
+ AWAIT_READY(acknowledgement);
+
+ {
+ // GetState after task finished and check we have a completed task.
+ Future<v1::master::Response> v1Response =
+ post(master.get()->pid, v1Call, contentType);
+
+ AWAIT_READY(v1Response);
+ ASSERT_TRUE(v1Response->IsInitialized());
+ ASSERT_EQ(v1::master::Response::GET_STATE, v1Response->type());
+
+ const v1::master::Response::GetState& getState = v1Response->get_state();
+ ASSERT_EQ(1u, getState.get_tasks().completed_tasks_size());
+ ASSERT_EQ(0u, getState.get_tasks().tasks_size());
+ }
+
+ EXPECT_CALL(exec, shutdown(_))
+ .Times(AtMost(1));
+
+ driver.stop();
+ driver.join();
+}
+
+
TEST_P(MasterAPITest, GetTasksNoRunningTask)
{
Try<Owned<cluster::Master>> master = this->StartMaster();