You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by an...@apache.org on 2016/07/06 05:53:35 UTC

[6/7] mesos git commit: Implemented 'GetState' call in v1 master API.

Implemented 'GetState' call in v1 master API.

Also created a helper function `_getState()` that will be used
for snapshot of event stream when a client subscribes.

Review: https://reviews.apache.org/r/49517/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3038809e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3038809e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3038809e

Branch: refs/heads/master
Commit: 3038809e38a3b8598e3c33c0e66d9db4552f0d29
Parents: 532f66a
Author: Zhitao Li <zh...@gmail.com>
Authored: Tue Jul 5 21:25:56 2016 -0700
Committer: Anand Mazumdar <an...@apache.org>
Committed: Tue Jul 5 22:32:57 2016 -0700

----------------------------------------------------------------------
 src/master/http.cpp     |  50 ++++++++++++++++-
 src/master/master.hpp   |   8 +++
 src/tests/api_tests.cpp | 131 +++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 188 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/3038809e/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 7085b07..10b0572 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -523,7 +523,7 @@ Future<Response> Master::Http::api(
       return NotImplemented();
 
     case mesos::master::Call::GET_STATE:
-      return NotImplemented();
+      return getState(call, principal, acceptType);
 
     case mesos::master::Call::GET_STATE_SUMMARY:
       return NotImplemented();
@@ -1542,6 +1542,54 @@ Future<mesos::master::Response::GetExecutors> Master::Http::_getExecutors(
 }
 
 
+Future<Response> Master::Http::getState(
+    const mesos::master::Call& call,
+    const Option<string>& principal,
+    ContentType contentType) const
+{
+  CHECK_EQ(mesos::master::Call::GET_STATE, call.type());
+
+  return _getState(principal)
+    .then([contentType](const mesos::master::Response::GetState& getState)
+      -> Future<Response> {
+      mesos::master::Response response;
+      response.set_type(mesos::master::Response::GET_STATE);
+      response.mutable_get_state()->CopyFrom(getState);
+
+      return OK(serialize(contentType, evolve(response)),
+                stringify(contentType));
+    });
+}
+
+
+Future<mesos::master::Response::GetState> Master::Http::_getState(
+    const Option<string>& principal) const
+{
+  return collect(
+      _getTasks(principal),
+      _getExecutors(principal),
+      _getFrameworks(principal),
+      _getAgents(principal))
+    .then(defer(master->self(),
+      [](const tuple<mesos::master::Response::GetTasks,
+                     mesos::master::Response::GetExecutors,
+                     mesos::master::Response::GetFrameworks,
+                     mesos::master::Response::GetAgents>& results)
+        -> mesos::master::Response::GetState {
+      mesos::master::Response::GetState getState;
+
+      // Use std::get instead of std::tie to avoid
+      // unnecessary copy of large data structs.
+      getState.mutable_get_tasks()->CopyFrom(std::get<0>(results));
+      getState.mutable_get_executors()->CopyFrom(std::get<1>(results));
+      getState.mutable_get_frameworks()->CopyFrom(std::get<2>(results));
+      getState.mutable_get_agents()->CopyFrom(std::get<3>(results));
+
+      return getState;
+    }));
+}
+
+
 class Master::Http::FlagsError : public Error
 {
 public:

http://git-wip-us.apache.org/repos/asf/mesos/blob/3038809e/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index de64e3d..0688ba3 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -1500,6 +1500,14 @@ private:
     process::Future<mesos::master::Response::GetExecutors> _getExecutors(
         const Option<std::string>& principal) const;
 
+    process::Future<process::http::Response> getState(
+        const mesos::master::Call& call,
+        const Option<std::string>& principal,
+        ContentType contentType) const;
+
+    process::Future<mesos::master::Response::GetState> _getState(
+        const Option<std::string>& principal) const;
+
     Master* master;
 
     // NOTE: The quota specific pieces of the Operator API are factored

http://git-wip-us.apache.org/repos/asf/mesos/blob/3038809e/src/tests/api_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp
index e2d8bf5..393afcf 100644
--- a/src/tests/api_tests.cpp
+++ b/src/tests/api_tests.cpp
@@ -425,6 +425,137 @@ TEST_P(MasterAPITest, GetExecutors)
 }
 
 
+TEST_P(MasterAPITest, GetState)
+{
+  v1::master::Call v1Call;
+  v1Call.set_type(v1::master::Call::GET_STATE);
+
+  master::Flags flags = CreateMasterFlags();
+
+  flags.hostname = "localhost";
+  flags.cluster = "test-cluster";
+
+  Try<Owned<cluster::Master>> master = StartMaster(flags);
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+  TestContainerizer containerizer(&exec);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  EXPECT_NE(0u, offers.get().size());
+
+  ContentType contentType = GetParam();
+  {
+    // GetState before task launch and check we have one framework, one agent
+    // and zero tasks/executors.
+    Future<v1::master::Response> v1Response =
+      post(master.get()->pid, v1Call, contentType);
+
+    AWAIT_READY(v1Response);
+    ASSERT_TRUE(v1Response->IsInitialized());
+    ASSERT_EQ(v1::master::Response::GET_STATE, v1Response->type());
+
+    const v1::master::Response::GetState& getState = v1Response->get_state();
+    ASSERT_EQ(1u, getState.get_frameworks().frameworks_size());
+    ASSERT_EQ(1u, getState.get_agents().agents_size());
+    ASSERT_EQ(0u, getState.get_tasks().tasks_size());
+    ASSERT_EQ(0u, getState.get_executors().executors_size());
+  }
+
+  TaskInfo task = createTask(offers.get()[0], "", DEFAULT_EXECUTOR_ID);
+
+  Future<ExecutorDriver*> execDriver;
+  EXPECT_CALL(exec, registered(_, _, _, _))
+    .WillOnce(FutureArg<0>(&execDriver));
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  driver.launchTasks(offers.get()[0].id(), {task});
+
+  AWAIT_READY(execDriver);
+  AWAIT_READY(status);
+
+  EXPECT_EQ(TASK_RUNNING, status.get().state());
+
+  {
+    // GetState after task launch and check we have a running task.
+    Future<v1::master::Response> v1Response =
+      post(master.get()->pid, v1Call, contentType);
+
+    AWAIT_READY(v1Response);
+    ASSERT_TRUE(v1Response->IsInitialized());
+    ASSERT_EQ(v1::master::Response::GET_STATE, v1Response->type());
+
+    const v1::master::Response::GetState& getState = v1Response->get_state();
+    ASSERT_EQ(1u, getState.get_tasks().tasks_size());
+    ASSERT_EQ(0u, getState.get_tasks().completed_tasks_size());
+  }
+
+  Future<StatusUpdateAcknowledgementMessage> acknowledgement =
+    FUTURE_PROTOBUF(
+        StatusUpdateAcknowledgementMessage(),
+        _,
+        Eq(slave.get()->pid));
+
+  Future<TaskStatus> status2;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status2));
+
+  // Send a terminal update so that the task transitions to completed.
+  TaskStatus status3;
+  status3.mutable_task_id()->CopyFrom(task.task_id());
+  status3.set_state(TASK_FINISHED);
+
+  execDriver.get()->sendStatusUpdate(status3);
+
+  AWAIT_READY(status2);
+  EXPECT_EQ(TASK_FINISHED, status2.get().state());
+
+  AWAIT_READY(acknowledgement);
+
+  {
+    // GetState after task finished and check we have a completed task.
+    Future<v1::master::Response> v1Response =
+      post(master.get()->pid, v1Call, contentType);
+
+    AWAIT_READY(v1Response);
+    ASSERT_TRUE(v1Response->IsInitialized());
+    ASSERT_EQ(v1::master::Response::GET_STATE, v1Response->type());
+
+    const v1::master::Response::GetState& getState = v1Response->get_state();
+    ASSERT_EQ(1u, getState.get_tasks().completed_tasks_size());
+    ASSERT_EQ(0u, getState.get_tasks().tasks_size());
+  }
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  driver.stop();
+  driver.join();
+}
+
+
 TEST_P(MasterAPITest, GetTasksNoRunningTask)
 {
   Try<Owned<cluster::Master>> master = this->StartMaster();