You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by an...@apache.org on 2016/07/06 05:53:30 UTC

[1/7] mesos git commit: Implemented 'Subscribed' event for v1 master event stream.

Repository: mesos
Updated Branches:
  refs/heads/master d840f48f9 -> d66f0b1e6


Implemented 'Subscribed' event for v1 master event stream.

This change adds logic for sending a `Subscribed` event containing
the present master state when a client subscribes to the event stream.

Review: https://reviews.apache.org/r/49518/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d66f0b1e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d66f0b1e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d66f0b1e

Branch: refs/heads/master
Commit: d66f0b1e67f1fea50265d5026a1d89f7dcf85a7d
Parents: 3038809
Author: Zhitao Li <zh...@gmail.com>
Authored: Tue Jul 5 21:26:00 2016 -0700
Committer: Anand Mazumdar <an...@apache.org>
Committed: Tue Jul 5 22:32:57 2016 -0700

----------------------------------------------------------------------
 include/mesos/master/master.proto    | 17 +++++--
 include/mesos/v1/master/master.proto | 17 +++++--
 src/master/http.cpp                  | 39 +++++++++-----
 src/tests/api_tests.cpp              | 84 ++++++++++++++++++-------------
 4 files changed, 102 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/d66f0b1e/include/mesos/master/master.proto
----------------------------------------------------------------------
diff --git a/include/mesos/master/master.proto b/include/mesos/master/master.proto
index f0c8a56..6aedc2f 100644
--- a/include/mesos/master/master.proto
+++ b/include/mesos/master/master.proto
@@ -435,12 +435,20 @@ message Response {
 message Event {
   enum Type {
     UNKNOWN = 0;
-    TASK_ADDED = 1; // See `TaskAdded` below.
-    TASK_UPDATED = 2; // See `TaskUpdated` below.
+    SUBSCRIBED = 1; // See `Subscribed` below.
+    TASK_ADDED = 2; // See `TaskAdded` below.
+    TASK_UPDATED = 3; // See `TaskUpdated` below.
 
     // TODO(vinod): Fill in more events.
   }
 
+  // First event received when a client subscribes.
+  message Subscribed {
+    // Snapshot of the entire cluster state. Further updates to the
+    // cluster state are sent as separate events on the stream.
+    optional Response.GetState get_state = 1;
+  }
+
   // Forwarded by the master when a task becomes known to it. This can happen
   // when a new task is launched by the scheduler or when the task becomes
   // known to the master upon an agent (re-)registration after a failover.
@@ -459,6 +467,7 @@ message Event {
 
   optional Type type = 1;
 
-  optional TaskAdded task_added = 2;
-  optional TaskUpdated task_updated = 3;
+  optional Subscribed subscribed = 2;
+  optional TaskAdded task_added = 3;
+  optional TaskUpdated task_updated = 4;
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/d66f0b1e/include/mesos/v1/master/master.proto
----------------------------------------------------------------------
diff --git a/include/mesos/v1/master/master.proto b/include/mesos/v1/master/master.proto
index 0c8cf15..19dbd1a 100644
--- a/include/mesos/v1/master/master.proto
+++ b/include/mesos/v1/master/master.proto
@@ -436,12 +436,20 @@ message Response {
 message Event {
   enum Type {
     UNKNOWN = 0;
-    TASK_ADDED = 1; // See `TaskAdded` below.
-    TASK_UPDATED = 2; // See `TaskUpdated` below.
+    SUBSCRIBED = 1; // See `Subscribed` below.
+    TASK_ADDED = 2; // See `TaskAdded` below.
+    TASK_UPDATED = 3; // See `TaskUpdated` below.
 
     // TODO(vinod): Fill in more events.
   }
 
+  // First event received when a client subscribes.
+  message Subscribed {
+    // Snapshot of the entire cluster state. Further updates to the
+    // cluster state are sent as separate events on the stream.
+    optional Response.GetState get_state = 1;
+  }
+
   // Forwarded by the master when a task becomes known to it. This can happen
   // when a new task is launched by the scheduler or when the task becomes
   // known to the master upon an agent (re-)registration after a failover.
@@ -460,6 +468,7 @@ message Event {
 
   optional Type type = 1;
 
-  optional TaskAdded task_added = 2;
-  optional TaskUpdated task_updated = 3;
+  optional Subscribed subscribed = 2;
+  optional TaskAdded task_added = 3;
+  optional TaskUpdated task_updated = 4;
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/d66f0b1e/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 10b0572..3640486 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -589,18 +589,33 @@ Future<Response> Master::Http::api(
       return quotaHandler.remove(call, principal);
 
     case mesos::master::Call::SUBSCRIBE: {
-      Pipe pipe;
-      OK ok;
-
-      ok.headers["Content-Type"] = stringify(acceptType);
-      ok.type = Response::PIPE;
-      ok.reader = pipe.reader();
-
-      HttpConnection http {pipe.writer(), acceptType, UUID::random()};
-
-      master->subscribe(http);
-
-      return ok;
+      return _getState(principal)
+        .then(defer(master->self(),
+            [this, acceptType]
+            (const mesos::master::Response::GetState& getState)
+              -> Future<Response> {
+          // TODO(zhitao): There is a possible race condition here: if an action
+          // like `taskUpdate()` is queued between `_getState()` and this
+          // continuation, neither the event will be sent to the subscriber
+          // (because the connection is not in subscribers yet), nor
+          // the effect of the change would be captured in the snapshot.
+          Pipe pipe;
+          OK ok;
+
+          ok.headers["Content-Type"] = stringify(acceptType);
+          ok.type = Response::PIPE;
+          ok.reader = pipe.reader();
+
+          HttpConnection http {pipe.writer(), acceptType, UUID::random()};
+          master->subscribe(http);
+
+          mesos::master::Event event;
+          event.set_type(mesos::master::Event::SUBSCRIBED);
+          event.mutable_subscribed()->mutable_get_state()->CopyFrom(getState);
+          http.send<mesos::master::Event, v1::master::Event>(event);
+
+          return ok;
+        }));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/d66f0b1e/src/tests/api_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp
index 393afcf..7cf716d 100644
--- a/src/tests/api_tests.cpp
+++ b/src/tests/api_tests.cpp
@@ -1328,44 +1328,11 @@ TEST_P(MasterAPITest, StartAndStopMaintenance)
 // endpoint is able to receive `TASK_ADDED`/`TASK_UPDATED` events.
 TEST_P(MasterAPITest, Subscribe)
 {
-  Try<Owned<cluster::Master>> master = this->StartMaster();
-  ASSERT_SOME(master);
-
-  v1::master::Call v1Call;
-  v1Call.set_type(v1::master::Call::SUBSCRIBE);
-
-  process::http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
-
   ContentType contentType = GetParam();
-  headers["Accept"] = stringify(contentType);
-
-  Future<Response> response = process::http::streaming::post(
-      master.get()->pid,
-      "api/v1",
-      headers,
-      serialize(contentType, v1Call),
-      stringify(contentType));
-
-  AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
-  AWAIT_EXPECT_RESPONSE_HEADER_EQ("chunked", "Transfer-Encoding", response);
-  ASSERT_EQ(Response::PIPE, response.get().type);
-  ASSERT_SOME(response->reader);
-
-  Pipe::Reader reader = response->reader.get();
-
-  auto deserializer =
-    lambda::bind(deserialize<v1::master::Event>, contentType, lambda::_1);
-
-  Reader<v1::master::Event> decoder(
-      Decoder<v1::master::Event>(deserializer), reader);
 
-  Future<Result<v1::master::Event>> event = decoder.read();
-
-  EXPECT_TRUE(event.isPending());
+  Try<Owned<cluster::Master>> master = this->StartMaster();
+  ASSERT_SOME(master);
 
-  // Launch a task using the scheduler. This should result in a `TASK_ADDED`
-  // event when the task is launched followed by a `TASK_UPDATED` event after
-  // the task transitions to running state.
   auto scheduler = std::make_shared<MockV1HTTPScheduler>();
   auto executor = std::make_shared<MockV1HTTPExecutor>();
 
@@ -1407,11 +1374,58 @@ TEST_P(MasterAPITest, Subscribe)
 
   AWAIT_READY(subscribed);
 
+  // Launch a task using the scheduler. This should result in a `TASK_ADDED`
+  // event when the task is launched followed by a `TASK_UPDATED` event after
+  // the task transitions to running state.
   v1::FrameworkID frameworkId(subscribed->framework_id());
 
   AWAIT_READY(offers);
   EXPECT_NE(0, offers->offers().size());
 
+  // Create event stream after seeing first offer but before first task is
+  // launched. We should see one framework, one agent and zero task/executor.
+  v1::master::Call v1Call;
+  v1Call.set_type(v1::master::Call::SUBSCRIBE);
+
+  process::http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+
+  headers["Accept"] = stringify(contentType);
+
+  Future<Response> response = process::http::streaming::post(
+      master.get()->pid,
+      "api/v1",
+      headers,
+      serialize(contentType, v1Call),
+      stringify(contentType));
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+  AWAIT_EXPECT_RESPONSE_HEADER_EQ("chunked", "Transfer-Encoding", response);
+  ASSERT_EQ(Response::PIPE, response.get().type);
+  ASSERT_SOME(response->reader);
+
+  Pipe::Reader reader = response->reader.get();
+
+  auto deserializer =
+    lambda::bind(deserialize<v1::master::Event>, contentType, lambda::_1);
+
+  Reader<v1::master::Event> decoder(
+      Decoder<v1::master::Event>(deserializer), reader);
+
+  Future<Result<v1::master::Event>> event = decoder.read();
+  AWAIT_READY(event);
+
+  EXPECT_EQ(v1::master::Event::SUBSCRIBED, event.get().get().type());
+  const v1::master::Response::GetState& getState =
+      event.get().get().subscribed().get_state();
+
+  EXPECT_EQ(1u, getState.get_frameworks().frameworks_size());
+  EXPECT_EQ(1u, getState.get_agents().agents_size());
+  EXPECT_EQ(0u, getState.get_tasks().tasks_size());
+  EXPECT_EQ(0u, getState.get_executors().executors_size());
+
+  event = decoder.read();
+  EXPECT_TRUE(event.isPending());
+
   const v1::Offer& offer = offers->offers(0);
 
   TaskInfo task = createTask(internal::devolve(offer), "", executorId);


[3/7] mesos git commit: Refactored 'Master::Http::getAgents()' into helper function.

Posted by an...@apache.org.
Refactored 'Master::Http::getAgents()' into helper function.

This helper function will be reused by both `GET_AGENTS`
and `GET_STATE` calls.

Review: https://reviews.apache.org/r/49488/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2517ce8f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2517ce8f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2517ce8f

Branch: refs/heads/master
Commit: 2517ce8f4ebf419589b9298b183149d221d9aecc
Parents: a318264
Author: Zhitao Li <zh...@gmail.com>
Authored: Tue Jul 5 21:25:43 2016 -0700
Committer: Anand Mazumdar <an...@apache.org>
Committed: Tue Jul 5 22:32:57 2016 -0700

----------------------------------------------------------------------
 src/master/http.cpp   | 24 ++++++++++++++++++------
 src/master/master.hpp |  3 +++
 2 files changed, 21 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/2517ce8f/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 9dee513..2341f15 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -2089,12 +2089,25 @@ Future<process::http::Response> Master::Http::getAgents(
 {
   CHECK_EQ(mesos::master::Call::GET_AGENTS, call.type());
 
-  mesos::master::Response response;
-  response.set_type(mesos::master::Response::GET_AGENTS);
+  return _getAgents(principal)
+    .then([contentType](const mesos::master::Response::GetAgents& getAgents)
+      -> Future<Response> {
+      mesos::master::Response response;
+      response.set_type(mesos::master::Response::GET_AGENTS);
+      response.mutable_get_agents()->CopyFrom(getAgents);
+
+      return OK(serialize(contentType, evolve(response)),
+                stringify(contentType));
+  });
+}
+
 
+Future<mesos::master::Response::GetAgents> Master::Http::_getAgents(
+    const Option<string>& principal) const
+{
+  mesos::master::Response::GetAgents getAgents;
   foreachvalue (const Slave* slave, master->slaves.registered) {
-    mesos::master::Response::GetAgents::Agent* agent =
-        response.mutable_get_agents()->add_agents();
+    mesos::master::Response::GetAgents::Agent* agent = getAgents.add_agents();
 
     agent->mutable_agent_info()->CopyFrom(slave->info);
 
@@ -2123,8 +2136,7 @@ Future<process::http::Response> Master::Http::getAgents(
     }
   }
 
-  return OK(serialize(contentType, evolve(response)),
-            stringify(contentType));
+  return getAgents;
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/2517ce8f/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index b9d91c4..09e5105 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -1388,6 +1388,9 @@ private:
         const Option<std::string>& principal,
         ContentType contentType) const;
 
+    process::Future<mesos::master::Response::GetAgents> _getAgents(
+        const Option<std::string>& principal) const;
+
     process::Future<process::http::Response> getFlags(
         const mesos::master::Call& call,
         const Option<std::string>& principal,


[2/7] mesos git commit: Refactored 'Master::Http::getExecutors()' into helper function.

Posted by an...@apache.org.
Refactored 'Master::Http::getExecutors()' into helper function.

This helper function will be reused by `GetExecutors` and `GetState`.

Review: https://reviews.apache.org/r/49516/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/532f66a3
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/532f66a3
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/532f66a3

Branch: refs/heads/master
Commit: 532f66a3a9c98bdb2259091851707cb9e42da0ea
Parents: 3a988e2
Author: Zhitao Li <zh...@gmail.com>
Authored: Tue Jul 5 21:25:53 2016 -0700
Committer: Anand Mazumdar <an...@apache.org>
Committed: Tue Jul 5 22:32:57 2016 -0700

----------------------------------------------------------------------
 src/master/http.cpp   | 32 ++++++++++++++++++++++----------
 src/master/master.hpp |  3 +++
 2 files changed, 25 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/532f66a3/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 7020095..7085b07 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -1425,6 +1425,23 @@ Future<Response> Master::Http::getExecutors(
 {
   CHECK_EQ(mesos::master::Call::GET_EXECUTORS, call.type());
 
+  return _getExecutors(principal)
+    .then(
+      [contentType](const mesos::master::Response::GetExecutors& getExecutors)
+        -> Future<Response> {
+      mesos::master::Response response;
+      response.set_type(mesos::master::Response::GET_EXECUTORS);
+      response.mutable_get_executors()->CopyFrom(getExecutors);
+
+      return OK(serialize(contentType, evolve(response)),
+                stringify(contentType));
+    });
+}
+
+
+Future<mesos::master::Response::GetExecutors> Master::Http::_getExecutors(
+    const Option<std::string>& principal) const
+{
   // Retrieve `ObjectApprover`s for authorizing frameworks and executors.
   Future<Owned<ObjectApprover>> frameworksApprover;
   Future<Owned<ObjectApprover>> executorsApprover;
@@ -1448,7 +1465,7 @@ Future<Response> Master::Http::getExecutors(
     .then(defer(master->self(),
         [=](const tuple<Owned<ObjectApprover>,
                         Owned<ObjectApprover>>& approvers)
-          -> Response {
+          -> Future<mesos::master::Response::GetExecutors> {
       // Get approver from tuple.
       Owned<ObjectApprover> frameworksApprover;
       Owned<ObjectApprover> executorsApprover;
@@ -1475,11 +1492,7 @@ Future<Response> Master::Http::getExecutors(
         frameworks.push_back(framework.get());
       }
 
-      mesos::master::Response response;
-      response.set_type(mesos::master::Response::GET_EXECUTORS);
-
-      mesos::master::Response::GetExecutors* getExecutors =
-        response.mutable_get_executors();
+      mesos::master::Response::GetExecutors getExecutors;
 
       foreach (const Framework* framework, frameworks) {
         foreachpair (const SlaveID& slaveId,
@@ -1494,7 +1507,7 @@ Future<Response> Master::Http::getExecutors(
             }
 
             mesos::master::Response::GetExecutors::Executor* executor =
-              getExecutors->add_executors();
+              getExecutors.add_executors();
 
             executor->mutable_executor_info()->CopyFrom(info);
             executor->mutable_slave_id()->CopyFrom(slaveId);
@@ -1515,7 +1528,7 @@ Future<Response> Master::Http::getExecutors(
           foreachvalue (const ExecutorInfo& info, executors) {
             if (!master->frameworks.registered.contains(frameworkId)) {
               mesos::master::Response::GetExecutors::Executor* executor =
-                getExecutors->add_orphan_executors();
+                getExecutors.add_orphan_executors();
 
               executor->mutable_executor_info()->CopyFrom(info);
               executor->mutable_slave_id()->CopyFrom(slave->id);
@@ -1524,8 +1537,7 @@ Future<Response> Master::Http::getExecutors(
         }
       }
 
-      return OK(serialize(contentType, evolve(response)),
-                stringify(contentType));
+      return getExecutors;
     }));
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/532f66a3/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 6f2a2b5..de64e3d 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -1497,6 +1497,9 @@ private:
         const Option<std::string>& principal,
         ContentType contentType) const;
 
+    process::Future<mesos::master::Response::GetExecutors> _getExecutors(
+        const Option<std::string>& principal) const;
+
     Master* master;
 
     // NOTE: The quota specific pieces of the Operator API are factored


[5/7] mesos git commit: Refactored 'master::Http::getFrameworks()' to helper function.

Posted by an...@apache.org.
Refactored 'master::Http::getFrameworks()' to helper function.

This helper function will be reused by `GET_FRAMEWORKS` and
`GET_STATE` calls.

Review: https://reviews.apache.org/r/49489/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/dc73420f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/dc73420f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/dc73420f

Branch: refs/heads/master
Commit: dc73420f920d948f63e8abddf37d3136969c2c88
Parents: 2517ce8
Author: Zhitao Li <zh...@gmail.com>
Authored: Tue Jul 5 21:25:46 2016 -0700
Committer: Anand Mazumdar <an...@apache.org>
Committed: Tue Jul 5 22:32:57 2016 -0700

----------------------------------------------------------------------
 src/master/http.cpp   | 33 +++++++++++++++++++++++----------
 src/master/master.hpp |  3 +++
 2 files changed, 26 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/dc73420f/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 2341f15..7020095 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -1346,6 +1346,22 @@ Future<Response> Master::Http::getFrameworks(
 {
   CHECK_EQ(mesos::master::Call::GET_FRAMEWORKS, call.type());
 
+  return _getFrameworks(principal)
+    .then([contentType](
+        const mesos::master::Response::GetFrameworks& getFrameworks)
+          -> Future<Response> {
+      mesos::master::Response response;
+      response.set_type(mesos::master::Response::GET_FRAMEWORKS);
+      response.mutable_get_frameworks()->CopyFrom(getFrameworks);
+      return OK(serialize(contentType, evolve(response)),
+                stringify(contentType));
+    });
+}
+
+
+Future<mesos::master::Response::GetFrameworks> Master::Http::_getFrameworks(
+    const Option<string>& principal) const
+{
   // Retrieve `ObjectApprover`s for authorizing frameworks.
   Future<Owned<ObjectApprover>> frameworksApprover;
 
@@ -1364,9 +1380,9 @@ Future<Response> Master::Http::getFrameworks(
 
   return frameworksApprover
     .then(defer(master->self(),
-        [=](const Owned<ObjectApprover>& frameworksApprover) -> Response {
-      mesos::master::Response response;
-      response.set_type(mesos::master::Response::GET_FRAMEWORKS);
+        [=](const Owned<ObjectApprover>& frameworksApprover)
+          -> Future<mesos::master::Response::GetFrameworks> {
+      mesos::master::Response::GetFrameworks getFrameworks;
 
       foreachvalue (const Framework* framework,
                     master->frameworks.registered) {
@@ -1375,8 +1391,7 @@ Future<Response> Master::Http::getFrameworks(
           continue;
         }
 
-        response.mutable_get_frameworks()->add_frameworks()
-            ->CopyFrom(model(*framework));
+        getFrameworks.add_frameworks()->CopyFrom(model(*framework));
       }
 
       foreach (const std::shared_ptr<Framework>& framework,
@@ -1386,21 +1401,19 @@ Future<Response> Master::Http::getFrameworks(
           continue;
         }
 
-        response.mutable_get_frameworks()->add_completed_frameworks()
-            ->CopyFrom(model(*framework));
+        getFrameworks.add_completed_frameworks()->CopyFrom(model(*framework));
       }
 
       foreachvalue (const Slave* slave, master->slaves.registered) {
         foreachkey (const FrameworkID& frameworkId, slave->tasks) {
           if (!master->frameworks.registered.contains(frameworkId)) {
-            response.mutable_get_frameworks()->add_unsubscribed_frameworks()
+            getFrameworks.add_unsubscribed_frameworks()
                 ->set_value(frameworkId.value());
           }
         }
       }
 
-      return OK(serialize(contentType, evolve(response)),
-                stringify(contentType));
+      return getFrameworks;
     }));
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/dc73420f/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 09e5105..6f2a2b5 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -1489,6 +1489,9 @@ private:
         const Option<std::string>& principal,
         ContentType contentType) const;
 
+    process::Future<mesos::master::Response::GetFrameworks> _getFrameworks(
+        const Option<std::string>& principal) const;
+
     process::Future<process::http::Response> getExecutors(
         const mesos::master::Call& call,
         const Option<std::string>& principal,


[6/7] mesos git commit: Implemented 'GetState' call in v1 master API.

Posted by an...@apache.org.
Implemented 'GetState' call in v1 master API.

Also created a helper function `_getState()` that will be used
for snapshot of event stream when a client subscribes.

Review: https://reviews.apache.org/r/49517/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3038809e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3038809e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3038809e

Branch: refs/heads/master
Commit: 3038809e38a3b8598e3c33c0e66d9db4552f0d29
Parents: 532f66a
Author: Zhitao Li <zh...@gmail.com>
Authored: Tue Jul 5 21:25:56 2016 -0700
Committer: Anand Mazumdar <an...@apache.org>
Committed: Tue Jul 5 22:32:57 2016 -0700

----------------------------------------------------------------------
 src/master/http.cpp     |  50 ++++++++++++++++-
 src/master/master.hpp   |   8 +++
 src/tests/api_tests.cpp | 131 +++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 188 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/3038809e/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 7085b07..10b0572 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -523,7 +523,7 @@ Future<Response> Master::Http::api(
       return NotImplemented();
 
     case mesos::master::Call::GET_STATE:
-      return NotImplemented();
+      return getState(call, principal, acceptType);
 
     case mesos::master::Call::GET_STATE_SUMMARY:
       return NotImplemented();
@@ -1542,6 +1542,54 @@ Future<mesos::master::Response::GetExecutors> Master::Http::_getExecutors(
 }
 
 
+Future<Response> Master::Http::getState(
+    const mesos::master::Call& call,
+    const Option<string>& principal,
+    ContentType contentType) const
+{
+  CHECK_EQ(mesos::master::Call::GET_STATE, call.type());
+
+  return _getState(principal)
+    .then([contentType](const mesos::master::Response::GetState& getState)
+      -> Future<Response> {
+      mesos::master::Response response;
+      response.set_type(mesos::master::Response::GET_STATE);
+      response.mutable_get_state()->CopyFrom(getState);
+
+      return OK(serialize(contentType, evolve(response)),
+                stringify(contentType));
+    });
+}
+
+
+Future<mesos::master::Response::GetState> Master::Http::_getState(
+    const Option<string>& principal) const
+{
+  return collect(
+      _getTasks(principal),
+      _getExecutors(principal),
+      _getFrameworks(principal),
+      _getAgents(principal))
+    .then(defer(master->self(),
+      [](const tuple<mesos::master::Response::GetTasks,
+                     mesos::master::Response::GetExecutors,
+                     mesos::master::Response::GetFrameworks,
+                     mesos::master::Response::GetAgents>& results)
+        -> mesos::master::Response::GetState {
+      mesos::master::Response::GetState getState;
+
+      // Use std::get instead of std::tie to avoid
+      // unnecessary copy of large data structs.
+      getState.mutable_get_tasks()->CopyFrom(std::get<0>(results));
+      getState.mutable_get_executors()->CopyFrom(std::get<1>(results));
+      getState.mutable_get_frameworks()->CopyFrom(std::get<2>(results));
+      getState.mutable_get_agents()->CopyFrom(std::get<3>(results));
+
+      return getState;
+    }));
+}
+
+
 class Master::Http::FlagsError : public Error
 {
 public:

http://git-wip-us.apache.org/repos/asf/mesos/blob/3038809e/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index de64e3d..0688ba3 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -1500,6 +1500,14 @@ private:
     process::Future<mesos::master::Response::GetExecutors> _getExecutors(
         const Option<std::string>& principal) const;
 
+    process::Future<process::http::Response> getState(
+        const mesos::master::Call& call,
+        const Option<std::string>& principal,
+        ContentType contentType) const;
+
+    process::Future<mesos::master::Response::GetState> _getState(
+        const Option<std::string>& principal) const;
+
     Master* master;
 
     // NOTE: The quota specific pieces of the Operator API are factored

http://git-wip-us.apache.org/repos/asf/mesos/blob/3038809e/src/tests/api_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp
index e2d8bf5..393afcf 100644
--- a/src/tests/api_tests.cpp
+++ b/src/tests/api_tests.cpp
@@ -425,6 +425,137 @@ TEST_P(MasterAPITest, GetExecutors)
 }
 
 
+TEST_P(MasterAPITest, GetState)
+{
+  v1::master::Call v1Call;
+  v1Call.set_type(v1::master::Call::GET_STATE);
+
+  master::Flags flags = CreateMasterFlags();
+
+  flags.hostname = "localhost";
+  flags.cluster = "test-cluster";
+
+  Try<Owned<cluster::Master>> master = StartMaster(flags);
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+  TestContainerizer containerizer(&exec);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  EXPECT_NE(0u, offers.get().size());
+
+  ContentType contentType = GetParam();
+  {
+    // GetState before task launch and check we have one framework, one agent
+    // and zero tasks/executors.
+    Future<v1::master::Response> v1Response =
+      post(master.get()->pid, v1Call, contentType);
+
+    AWAIT_READY(v1Response);
+    ASSERT_TRUE(v1Response->IsInitialized());
+    ASSERT_EQ(v1::master::Response::GET_STATE, v1Response->type());
+
+    const v1::master::Response::GetState& getState = v1Response->get_state();
+    ASSERT_EQ(1u, getState.get_frameworks().frameworks_size());
+    ASSERT_EQ(1u, getState.get_agents().agents_size());
+    ASSERT_EQ(0u, getState.get_tasks().tasks_size());
+    ASSERT_EQ(0u, getState.get_executors().executors_size());
+  }
+
+  TaskInfo task = createTask(offers.get()[0], "", DEFAULT_EXECUTOR_ID);
+
+  Future<ExecutorDriver*> execDriver;
+  EXPECT_CALL(exec, registered(_, _, _, _))
+    .WillOnce(FutureArg<0>(&execDriver));
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  driver.launchTasks(offers.get()[0].id(), {task});
+
+  AWAIT_READY(execDriver);
+  AWAIT_READY(status);
+
+  EXPECT_EQ(TASK_RUNNING, status.get().state());
+
+  {
+    // GetState after task launch and check we have a running task.
+    Future<v1::master::Response> v1Response =
+      post(master.get()->pid, v1Call, contentType);
+
+    AWAIT_READY(v1Response);
+    ASSERT_TRUE(v1Response->IsInitialized());
+    ASSERT_EQ(v1::master::Response::GET_STATE, v1Response->type());
+
+    const v1::master::Response::GetState& getState = v1Response->get_state();
+    ASSERT_EQ(1u, getState.get_tasks().tasks_size());
+    ASSERT_EQ(0u, getState.get_tasks().completed_tasks_size());
+  }
+
+  Future<StatusUpdateAcknowledgementMessage> acknowledgement =
+    FUTURE_PROTOBUF(
+        StatusUpdateAcknowledgementMessage(),
+        _,
+        Eq(slave.get()->pid));
+
+  Future<TaskStatus> status2;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status2));
+
+  // Send a terminal update so that the task transitions to completed.
+  TaskStatus status3;
+  status3.mutable_task_id()->CopyFrom(task.task_id());
+  status3.set_state(TASK_FINISHED);
+
+  execDriver.get()->sendStatusUpdate(status3);
+
+  AWAIT_READY(status2);
+  EXPECT_EQ(TASK_FINISHED, status2.get().state());
+
+  AWAIT_READY(acknowledgement);
+
+  {
+    // GetState after task finished and check we have a completed task.
+    Future<v1::master::Response> v1Response =
+      post(master.get()->pid, v1Call, contentType);
+
+    AWAIT_READY(v1Response);
+    ASSERT_TRUE(v1Response->IsInitialized());
+    ASSERT_EQ(v1::master::Response::GET_STATE, v1Response->type());
+
+    const v1::master::Response::GetState& getState = v1Response->get_state();
+    ASSERT_EQ(1u, getState.get_tasks().completed_tasks_size());
+    ASSERT_EQ(0u, getState.get_tasks().tasks_size());
+  }
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  driver.stop();
+  driver.join();
+}
+
+
 TEST_P(MasterAPITest, GetTasksNoRunningTask)
 {
   Try<Owned<cluster::Master>> master = this->StartMaster();


[4/7] mesos git commit: Refactored 'Master::Http::getTasks()' into helper function.

Posted by an...@apache.org.
Refactored 'Master::Http::getTasks()' into helper function.

This helper function will be also be reused for `GetState`.

Review: https://reviews.apache.org/r/49487/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a3182645
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a3182645
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a3182645

Branch: refs/heads/master
Commit: a3182645e01cf9240008f3b84c1142a552600954
Parents: d840f48
Author: Zhitao Li <zh...@gmail.com>
Authored: Tue Jul 5 21:25:39 2016 -0700
Committer: Anand Mazumdar <an...@apache.org>
Committed: Tue Jul 5 22:32:57 2016 -0700

----------------------------------------------------------------------
 src/master/http.cpp   | 35 +++++++++++++++++++++--------------
 src/master/master.hpp |  3 +++
 2 files changed, 24 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a3182645/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index debedd4..9dee513 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -3381,6 +3381,20 @@ Future<Response> Master::Http::getTasks(
 {
   CHECK_EQ(mesos::master::Call::GET_TASKS, call.type());
 
+  return _getTasks(principal)
+    .then([contentType](const mesos::master::Response::GetTasks& getTasks)
+        -> Future<Response> {
+      mesos::master::Response response;
+      response.set_type(mesos::master::Response::GET_TASKS);
+      response.mutable_get_tasks()->CopyFrom(getTasks);
+      return OK(serialize(contentType, evolve(response)),
+                stringify(contentType));
+  });
+}
+
+
+Future<mesos::master::Response::GetTasks> Master::Http::_getTasks(
+    const Option<string>& principal) const {
   // Retrieve Approvers for authorizing frameworks and tasks.
   Future<Owned<ObjectApprover>> frameworksApprover;
   Future<Owned<ObjectApprover>> tasksApprover;
@@ -3404,7 +3418,7 @@ Future<Response> Master::Http::getTasks(
     .then(defer(master->self(),
       [=](const tuple<Owned<ObjectApprover>,
                       Owned<ObjectApprover>>& approvers)
-        -> Future<Response> {
+        -> Future<mesos::master::Response::GetTasks> {
       // Get approver from tuple.
       Owned<ObjectApprover> frameworksApprover;
       Owned<ObjectApprover> tasksApprover;
@@ -3431,12 +3445,7 @@ Future<Response> Master::Http::getTasks(
         frameworks.push_back(framework.get());
       }
 
-      mesos::master::Response response;
-      response.set_type(mesos::master::Response::GET_TASKS);
-
-      mesos::master::Response::GetTasks* getTasks =
-        response.mutable_get_tasks();
-
+      mesos::master::Response::GetTasks getTasks;
       vector<const Task*> tasks;
       foreach (const Framework* framework, frameworks) {
         // Pending tasks.
@@ -3449,7 +3458,7 @@ Future<Response> Master::Http::getTasks(
           const Task& task =
             protobuf::createTask(taskInfo, TASK_STAGING, framework->id());
 
-          getTasks->add_pending_tasks()->CopyFrom(task);
+          getTasks.add_pending_tasks()->CopyFrom(task);
         }
 
         // Active tasks.
@@ -3460,7 +3469,7 @@ Future<Response> Master::Http::getTasks(
             continue;
           }
 
-          getTasks->add_tasks()->CopyFrom(*task);
+          getTasks.add_tasks()->CopyFrom(*task);
         }
 
         // Completed tasks.
@@ -3470,7 +3479,7 @@ Future<Response> Master::Http::getTasks(
             continue;
           }
 
-          getTasks->add_completed_tasks()->CopyFrom(*task);
+          getTasks.add_completed_tasks()->CopyFrom(*task);
         }
       }
 
@@ -3486,14 +3495,12 @@ Future<Response> Master::Http::getTasks(
             CHECK_NOTNULL(task);
             if (!master->frameworks.registered.contains(
                 task->framework_id())) {
-              getTasks->add_orphan_tasks()->CopyFrom(*task);
+              getTasks.add_orphan_tasks()->CopyFrom(*task);
             }
           }
         }
       }
-
-      return OK(serialize(contentType, evolve(response)),
-                stringify(contentType));
+      return getTasks;
   }));
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/a3182645/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index fbacd92..b9d91c4 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -1458,6 +1458,9 @@ private:
         const Option<std::string>& principal,
         ContentType contentType) const;
 
+    process::Future<mesos::master::Response::GetTasks> _getTasks(
+        const Option<std::string>& principal) const;
+
     process::Future<process::http::Response> createVolumes(
         const mesos::master::Call& call,
         const Option<std::string>& principal,


[7/7] mesos git commit: Revised protobuf definition of 'GetState' response.

Posted by an...@apache.org.
Revised protobuf definition of 'GetState' response.

Review: https://reviews.apache.org/r/49509/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3a988e2e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3a988e2e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3a988e2e

Branch: refs/heads/master
Commit: 3a988e2eac02f8f5590e1adced40d10b5360cabe
Parents: dc73420
Author: Zhitao Li <zh...@gmail.com>
Authored: Tue Jul 5 21:25:50 2016 -0700
Committer: Anand Mazumdar <an...@apache.org>
Committed: Tue Jul 5 22:32:57 2016 -0700

----------------------------------------------------------------------
 include/mesos/master/master.proto    | 7 ++++++-
 include/mesos/v1/master/master.proto | 7 ++++++-
 2 files changed, 12 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/3a988e2e/include/mesos/master/master.proto
----------------------------------------------------------------------
diff --git a/include/mesos/master/master.proto b/include/mesos/master/master.proto
index d06258e..f0c8a56 100644
--- a/include/mesos/master/master.proto
+++ b/include/mesos/master/master.proto
@@ -269,8 +269,13 @@ message Response {
     repeated bytes data = 1;
   }
 
+  // Contains full state of the master i.e. information about the tasks,
+  // agents, frameworks and executors running in the cluster.
   message GetState {
-    // TODO(vinod): Fill in the fields.
+    optional GetTasks get_tasks = 1;
+    optional GetExecutors get_executors = 2;
+    optional GetFrameworks get_frameworks = 3;
+    optional GetAgents get_agents = 4;
   }
 
   message GetStateSummary {

http://git-wip-us.apache.org/repos/asf/mesos/blob/3a988e2e/include/mesos/v1/master/master.proto
----------------------------------------------------------------------
diff --git a/include/mesos/v1/master/master.proto b/include/mesos/v1/master/master.proto
index b7cb6fd..0c8cf15 100644
--- a/include/mesos/v1/master/master.proto
+++ b/include/mesos/v1/master/master.proto
@@ -270,8 +270,13 @@ message Response {
     repeated bytes data = 1;
   }
 
+  // Contains full state of the master i.e. information about the tasks,
+  // agents, frameworks and executors running in the cluster.
   message GetState {
-    // TODO(vinod): Fill in the fields.
+    optional GetTasks get_tasks = 1;
+    optional GetExecutors get_executors = 2;
+    optional GetFrameworks get_frameworks = 3;
+    optional GetAgents get_agents = 4;
   }
 
   message GetStateSummary {