You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2016/07/13 19:10:56 UTC
[1/8] mesos git commit: Implemented 'GetFrameworks' call in v1 agent
API.
Repository: mesos
Updated Branches:
refs/heads/master e6444713d -> b413918be
Implemented 'GetFrameworks' call in v1 agent API.
Review: https://reviews.apache.org/r/49757/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e71233ca
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e71233ca
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e71233ca
Branch: refs/heads/master
Commit: e71233cae0630e819cdbc0333c6518074174bd19
Parents: e644471
Author: haosdent huang <ha...@gmail.com>
Authored: Wed Jul 13 12:10:05 2016 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Wed Jul 13 12:10:05 2016 -0700
----------------------------------------------------------------------
include/mesos/agent/agent.proto | 18 ++++++++-
include/mesos/v1/agent/agent.proto | 18 ++++++++-
src/slave/http.cpp | 69 +++++++++++++++++++++++++++++++++
src/slave/slave.hpp | 8 ++++
src/slave/validation.cpp | 3 ++
5 files changed, 112 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/e71233ca/include/mesos/agent/agent.proto
----------------------------------------------------------------------
diff --git a/include/mesos/agent/agent.proto b/include/mesos/agent/agent.proto
index cfd117d..7acb2d4 100644
--- a/include/mesos/agent/agent.proto
+++ b/include/mesos/agent/agent.proto
@@ -33,7 +33,7 @@ message Call {
// included in the corresponding `Call::Foo` message. Similarly, if a call
// receives a synchronous response it will be returned as a `Response`
// message of type `Response::FOO`.
- enum Type {
+ enum Type {
UNKNOWN = 0;
GET_HEALTH = 1; // Retrieves the agent's health status.
@@ -50,6 +50,7 @@ message Call {
GET_STATE = 9;
GET_CONTAINERS = 10;
+ GET_FRAMEWORKS = 11; // Retrieves the information about known frameworks.
}
// Provides a snapshot of the current metrics tracked by the agent.
@@ -120,6 +121,7 @@ message Response {
GET_STATE = 8;
GET_CONTAINERS = 9;
+ GET_FRAMEWORKS = 10; // See 'GetFrameworks' below.
}
// `healthy` would be true if the agent is healthy. Delayed responses are also
@@ -181,6 +183,17 @@ message Response {
repeated Container containers = 1;
}
+ // Information about all the frameworks known to the agent at the current
+ // time.
+ message GetFrameworks {
+ message Framework {
+ required FrameworkInfo framework_info = 1;
+ }
+
+ repeated Framework frameworks = 1;
+ repeated Framework completed_frameworks = 2;
+ }
+
optional Type type = 1;
optional GetHealth get_health = 2;
@@ -192,4 +205,5 @@ message Response {
optional ReadFile read_file = 8;
optional GetState get_state = 9;
optional GetContainers get_containers = 10;
-}
+ optional GetFrameworks get_frameworks = 11;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/mesos/blob/e71233ca/include/mesos/v1/agent/agent.proto
----------------------------------------------------------------------
diff --git a/include/mesos/v1/agent/agent.proto b/include/mesos/v1/agent/agent.proto
index 213c428..878e499 100644
--- a/include/mesos/v1/agent/agent.proto
+++ b/include/mesos/v1/agent/agent.proto
@@ -33,7 +33,7 @@ message Call {
// included in the corresponding `Call::Foo` message. Similarly, if a call
// receives a synchronous response it will be returned as a `Response`
// message of type `Response::FOO`.
- enum Type {
+ enum Type {
UNKNOWN = 0;
GET_HEALTH = 1; // Retrieves the agent's health status.
@@ -50,6 +50,7 @@ message Call {
GET_STATE = 9;
GET_CONTAINERS = 10;
+ GET_FRAMEWORKS = 11; // Retrieves the information about known frameworks.
}
// Provides a snapshot of the current metrics tracked by the agent.
@@ -120,6 +121,7 @@ message Response {
GET_STATE = 8;
GET_CONTAINERS = 9;
+ GET_FRAMEWORKS = 10; // See 'GetFrameworks' below.
}
// `healthy` would be true if the agent is healthy. Delayed responses are also
@@ -181,6 +183,17 @@ message Response {
repeated Container containers = 1;
}
+ // Information about all the frameworks known to the agent at the current
+ // time.
+ message GetFrameworks {
+ message Framework {
+ required FrameworkInfo framework_info = 1;
+ }
+
+ repeated Framework frameworks = 1;
+ repeated Framework completed_frameworks = 2;
+ }
+
optional Type type = 1;
optional GetHealth get_health = 2;
@@ -192,4 +205,5 @@ message Response {
optional ReadFile read_file = 8;
optional GetState get_state = 9;
optional GetContainers get_containers = 10;
-}
+ optional GetFrameworks get_frameworks = 11;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/mesos/blob/e71233ca/src/slave/http.cpp
----------------------------------------------------------------------
diff --git a/src/slave/http.cpp b/src/slave/http.cpp
index 21c7ebf..7132e7a 100644
--- a/src/slave/http.cpp
+++ b/src/slave/http.cpp
@@ -389,6 +389,9 @@ Future<Response> Slave::Http::api(
case agent::Call::GET_CONTAINERS:
return getContainers(call, principal, acceptType);
+
+ case agent::Call::GET_FRAMEWORKS:
+ return getFrameworks(call, principal, acceptType);
}
UNREACHABLE();
@@ -1051,6 +1054,72 @@ Future<Response> Slave::Http::state(
}
+Future<Response> Slave::Http::getFrameworks(
+ const agent::Call& call,
+ const Option<string>& principal,
+ ContentType contentType) const
+{
+ CHECK_EQ(agent::Call::GET_FRAMEWORKS, call.type());
+
+ // Retrieve `ObjectApprover`s for authorizing frameworks.
+ Future<Owned<ObjectApprover>> frameworksApprover;
+
+ if (slave->authorizer.isSome()) {
+ authorization::Subject subject;
+ if (principal.isSome()) {
+ subject.set_value(principal.get());
+ }
+
+ frameworksApprover = slave->authorizer.get()->getObjectApprover(
+ subject, authorization::VIEW_FRAMEWORK);
+
+ } else {
+ frameworksApprover = Owned<ObjectApprover>(new AcceptingObjectApprover());
+ }
+
+ return frameworksApprover
+ .then(defer(slave->self(),
+ [this, contentType](const Owned<ObjectApprover>& frameworksApprover)
+ -> Future<Response> {
+ agent::Response response;
+ response.set_type(agent::Response::GET_FRAMEWORKS);
+ response.mutable_get_frameworks()->CopyFrom(
+ _getFrameworks(frameworksApprover));
+
+ return OK(serialize(contentType, evolve(response)),
+ stringify(contentType));
+ }));
+}
+
+
+agent::Response::GetFrameworks Slave::Http::_getFrameworks(
+ const Owned<ObjectApprover>& frameworksApprover) const
+{
+ agent::Response::GetFrameworks getFrameworks;
+ foreachvalue (const Framework* framework, slave->frameworks) {
+ // Skip unauthorized frameworks.
+ if (!approveViewFrameworkInfo(frameworksApprover, framework->info)) {
+ continue;
+ }
+
+ getFrameworks.add_frameworks()->mutable_framework_info()
+ ->CopyFrom(framework->info);
+ }
+
+ foreach (const Owned<Framework>& framework, slave->completedFrameworks) {
+ // Skip unauthorized frameworks.
+ if (!approveViewFrameworkInfo(frameworksApprover, framework->info)) {
+ continue;
+ }
+
+ getFrameworks.add_completed_frameworks()->mutable_framework_info()
+ ->CopyFrom(framework->info);
+ }
+
+ return getFrameworks;
+}
+
+
string Slave::Http::STATISTICS_HELP()
{
return HELP(
http://git-wip-us.apache.org/repos/asf/mesos/blob/e71233ca/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 42afa9e..8e1921c 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -549,6 +549,14 @@ private:
const Option<std::string>& principal,
ContentType contentType) const;
+ process::Future<process::http::Response> getFrameworks(
+ const mesos::agent::Call& call,
+ const Option<std::string>& principal,
+ ContentType contentType) const;
+
+ mesos::agent::Response::GetFrameworks _getFrameworks(
+ const process::Owned<ObjectApprover>& frameworksApprover) const;
+
Slave* slave;
// Used to rate limit the statistics endpoint.
http://git-wip-us.apache.org/repos/asf/mesos/blob/e71233ca/src/slave/validation.cpp
----------------------------------------------------------------------
diff --git a/src/slave/validation.cpp b/src/slave/validation.cpp
index b07e80a..c5720f3 100644
--- a/src/slave/validation.cpp
+++ b/src/slave/validation.cpp
@@ -90,6 +90,9 @@ Option<Error> validate(
case mesos::agent::Call::GET_CONTAINERS:
return None();
+
+ case mesos::agent::Call::GET_FRAMEWORKS:
+ return None();
}
UNREACHABLE();
[7/8] mesos git commit: Added test case for 'GetTasks' call in v1
agent API.
Posted by vi...@apache.org.
Added test case for 'GetTasks' call in v1 agent API.
Review: https://reviews.apache.org/r/49799/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/dc803f5a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/dc803f5a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/dc803f5a
Branch: refs/heads/master
Commit: dc803f5a57ef50599d784f971a48445b5a4d6c88
Parents: 3230093
Author: haosdent huang <ha...@gmail.com>
Authored: Wed Jul 13 12:10:39 2016 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Wed Jul 13 12:10:39 2016 -0700
----------------------------------------------------------------------
src/tests/api_tests.cpp | 126 +++++++++++++++++++++++++++++++++++++++++++
1 file changed, 126 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/dc803f5a/src/tests/api_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp
index 3de11ab..8250c0d 100644
--- a/src/tests/api_tests.cpp
+++ b/src/tests/api_tests.cpp
@@ -2869,6 +2869,132 @@ TEST_P(AgentAPITest, GetExecutors)
}
}
+
+TEST_P(AgentAPITest, GetTasks)
+{
+ Try<Owned<cluster::Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+ Try<Owned<cluster::Slave>> slave = StartSlave(detector.get());
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(_, _, _));
+ Future<vector<Offer>> offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0u, offers.get().size());
+ const Offer& offer = offers.get()[0];
+
+ TaskInfo task;
+ task.set_name("");
+ task.mutable_task_id()->set_value("1");
+ task.mutable_slave_id()->MergeFrom(offer.slave_id());
+ task.mutable_resources()->MergeFrom(offer.resources());
+
+ CommandInfo command;
+ command.set_value("sleep 1000");
+ task.mutable_command()->MergeFrom(command);
+
+ Future<TaskStatus> statusRunning;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusRunning));
+
+ ContentType contentType = GetParam();
+
+ // No tasks launched, we should expect zero tasks in Response.
+ {
+ v1::agent::Call v1Call;
+ v1Call.set_type(v1::agent::Call::GET_TASKS);
+
+ Future<v1::agent::Response> v1Response =
+ post(slave.get()->pid, v1Call, contentType);
+
+ AWAIT_READY(v1Response);
+ ASSERT_TRUE(v1Response.get().IsInitialized());
+ ASSERT_EQ(v1::agent::Response::GET_TASKS, v1Response.get().type());
+ ASSERT_EQ(0, v1Response.get().get_tasks().pending_tasks_size());
+ ASSERT_EQ(0, v1Response.get().get_tasks().queued_tasks_size());
+ ASSERT_EQ(0, v1Response.get().get_tasks().launched_tasks_size());
+ ASSERT_EQ(0, v1Response.get().get_tasks().terminated_tasks_size());
+ ASSERT_EQ(0, v1Response.get().get_tasks().completed_tasks_size());
+ }
+
+ driver.launchTasks(offer.id(), {task});
+
+ AWAIT_READY(statusRunning);
+ EXPECT_EQ(TASK_RUNNING, statusRunning->state());
+
+ // A task launched, we expect one task in Response.
+ {
+ v1::agent::Call v1Call;
+ v1Call.set_type(v1::agent::Call::GET_TASKS);
+
+ Future<v1::agent::Response> v1Response =
+ post(slave.get()->pid, v1Call, contentType);
+
+ AWAIT_READY(v1Response);
+ ASSERT_TRUE(v1Response.get().IsInitialized());
+ ASSERT_EQ(v1::agent::Response::GET_TASKS, v1Response.get().type());
+ ASSERT_EQ(0, v1Response.get().get_tasks().pending_tasks_size());
+ ASSERT_EQ(0, v1Response.get().get_tasks().queued_tasks_size());
+ ASSERT_EQ(1, v1Response.get().get_tasks().launched_tasks_size());
+ ASSERT_EQ(0, v1Response.get().get_tasks().terminated_tasks_size());
+ ASSERT_EQ(0, v1Response.get().get_tasks().completed_tasks_size());
+ }
+
+ Clock::pause();
+
+ // Kill the task.
+ Future<TaskStatus> statusKilled;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusKilled));
+
+ Future<Nothing> _statusUpdateAcknowledgement =
+ FUTURE_DISPATCH(slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
+
+ driver.killTask(statusRunning->task_id());
+
+ AWAIT_READY(statusKilled);
+ EXPECT_EQ(TASK_KILLED, statusKilled->state());
+
+ // Make sure the agent receives and properly handles the ACK.
+ AWAIT_READY(_statusUpdateAcknowledgement);
+ Clock::settle();
+ Clock::resume();
+
+ // After the executor terminated, we should expect one completed task in
+ // Response.
+ {
+ v1::agent::Call v1Call;
+ v1Call.set_type(v1::agent::Call::GET_TASKS);
+
+ Future<v1::agent::Response> v1Response =
+ post(slave.get()->pid, v1Call, contentType);
+
+ AWAIT_READY(v1Response);
+ ASSERT_TRUE(v1Response.get().IsInitialized());
+ ASSERT_EQ(v1::agent::Response::GET_TASKS, v1Response.get().type());
+ ASSERT_EQ(0, v1Response.get().get_tasks().pending_tasks_size());
+ ASSERT_EQ(0, v1Response.get().get_tasks().queued_tasks_size());
+ ASSERT_EQ(0, v1Response.get().get_tasks().launched_tasks_size());
+ ASSERT_EQ(0, v1Response.get().get_tasks().terminated_tasks_size());
+ ASSERT_EQ(1, v1Response.get().get_tasks().completed_tasks_size());
+ }
+
+ driver.stop();
+ driver.join();
+}
+
} // namespace tests {
} // namespace internal {
} // namespace mesos {
[6/8] mesos git commit: Added test case for 'GetExecutors' call in v1
agent API.
Posted by vi...@apache.org.
Added test case for 'GetExecutors' call in v1 agent API.
Review: https://reviews.apache.org/r/49798/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3230093a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3230093a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3230093a
Branch: refs/heads/master
Commit: 3230093a3a03ab34a6b0ad866909b4af4258c0eb
Parents: 4990e7c
Author: haosdent huang <ha...@gmail.com>
Authored: Wed Jul 13 12:10:34 2016 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Wed Jul 13 12:10:34 2016 -0700
----------------------------------------------------------------------
src/tests/api_tests.cpp | 107 +++++++++++++++++++++++++++++++++++++++++++
1 file changed, 107 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/3230093a/src/tests/api_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp
index 40f5077..3de11ab 100644
--- a/src/tests/api_tests.cpp
+++ b/src/tests/api_tests.cpp
@@ -2762,6 +2762,113 @@ TEST_P(AgentAPITest, GetFrameworks)
}
}
+
+TEST_P(AgentAPITest, GetExecutors)
+{
+ Try<Owned<cluster::Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+ Try<Owned<cluster::Slave>> slave = StartSlave(detector.get());
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(_, _, _));
+ Future<vector<Offer>> offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0u, offers.get().size());
+ const Offer& offer = offers.get()[0];
+
+ TaskInfo task;
+ task.set_name("");
+ task.mutable_task_id()->set_value("1");
+ task.mutable_slave_id()->MergeFrom(offer.slave_id());
+ task.mutable_resources()->MergeFrom(offer.resources());
+
+ CommandInfo command;
+ command.set_value("sleep 1000");
+ task.mutable_command()->MergeFrom(command);
+
+ Future<TaskStatus> statusRunning;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusRunning));
+
+ ContentType contentType = GetParam();
+
+ // No tasks launched, we should expect zero executors in Response.
+ {
+ v1::agent::Call v1Call;
+ v1Call.set_type(v1::agent::Call::GET_EXECUTORS);
+
+ Future<v1::agent::Response> v1Response =
+ post(slave.get()->pid, v1Call, contentType);
+
+ AWAIT_READY(v1Response);
+ ASSERT_TRUE(v1Response.get().IsInitialized());
+ ASSERT_EQ(v1::agent::Response::GET_EXECUTORS, v1Response.get().type());
+ ASSERT_EQ(0, v1Response.get().get_executors().executors_size());
+ ASSERT_EQ(0, v1Response.get().get_executors().completed_executors_size());
+ }
+
+ driver.launchTasks(offer.id(), {task});
+
+ AWAIT_READY(statusRunning);
+ EXPECT_EQ(TASK_RUNNING, statusRunning->state());
+
+ // A task launched, we expect one executor in Response.
+ {
+ v1::agent::Call v1Call;
+ v1Call.set_type(v1::agent::Call::GET_EXECUTORS);
+
+ Future<v1::agent::Response> v1Response =
+ post(slave.get()->pid, v1Call, contentType);
+
+ AWAIT_READY(v1Response);
+ ASSERT_TRUE(v1Response.get().IsInitialized());
+ ASSERT_EQ(v1::agent::Response::GET_EXECUTORS, v1Response.get().type());
+ ASSERT_EQ(1, v1Response.get().get_executors().executors_size());
+ ASSERT_EQ(0, v1Response.get().get_executors().completed_executors_size());
+ }
+
+ // Make sure the executor terminated.
+ Future<Nothing> executorTerminated =
+ FUTURE_DISPATCH(_, &Slave::executorTerminated);
+
+ driver.stop();
+ driver.join();
+
+ AWAIT_READY(executorTerminated);
+
+ // Make sure `Framework::destroyExecutor()` is processed.
+ Clock::pause();
+ Clock::settle();
+
+ // After the executor terminated, we should expect one completed executor in
+ // Response.
+ {
+ v1::agent::Call v1Call;
+ v1Call.set_type(v1::agent::Call::GET_EXECUTORS);
+
+ Future<v1::agent::Response> v1Response =
+ post(slave.get()->pid, v1Call, contentType);
+
+ AWAIT_READY(v1Response);
+ ASSERT_TRUE(v1Response.get().IsInitialized());
+ ASSERT_EQ(v1::agent::Response::GET_EXECUTORS, v1Response.get().type());
+ ASSERT_EQ(0, v1Response.get().get_executors().executors_size());
+ ASSERT_EQ(1, v1Response.get().get_executors().completed_executors_size());
+ }
+}
+
} // namespace tests {
} // namespace internal {
} // namespace mesos {
[8/8] mesos git commit: Added test case for 'GetState' call in v1
agent API.
Posted by vi...@apache.org.
Added test case for 'GetState' call in v1 agent API.
Review: https://reviews.apache.org/r/49800/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b413918b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b413918b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b413918b
Branch: refs/heads/master
Commit: b413918beffd0e1cbee975851aa70c50d568202d
Parents: dc803f5
Author: haosdent huang <ha...@gmail.com>
Authored: Wed Jul 13 12:10:44 2016 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Wed Jul 13 12:10:44 2016 -0700
----------------------------------------------------------------------
src/tests/api_tests.cpp | 134 +++++++++++++++++++++++++++++++++++++++++++
1 file changed, 134 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/b413918b/src/tests/api_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp
index 8250c0d..b4a6c46 100644
--- a/src/tests/api_tests.cpp
+++ b/src/tests/api_tests.cpp
@@ -2995,6 +2995,140 @@ TEST_P(AgentAPITest, GetTasks)
driver.join();
}
+
+TEST_P(AgentAPITest, GetState)
+{
+ Try<Owned<cluster::Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+ Try<Owned<cluster::Slave>> slave = StartSlave(detector.get());
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(_, _, _));
+ Future<vector<Offer>> offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0u, offers.get().size());
+ const Offer& offer = offers.get()[0];
+
+ TaskInfo task;
+ task.set_name("");
+ task.mutable_task_id()->set_value("1");
+ task.mutable_slave_id()->MergeFrom(offer.slave_id());
+ task.mutable_resources()->MergeFrom(offer.resources());
+
+ CommandInfo command;
+ command.set_value("sleep 1000");
+ task.mutable_command()->MergeFrom(command);
+
+ Future<TaskStatus> statusRunning;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusRunning));
+
+ ContentType contentType = GetParam();
+
+ // GetState before task launch, we should expect zero
+ // frameworks/tasks/executors in Response.
+ {
+ v1::agent::Call v1Call;
+ v1Call.set_type(v1::agent::Call::GET_STATE);
+
+ Future<v1::agent::Response> v1Response =
+ post(slave.get()->pid, v1Call, contentType);
+
+ AWAIT_READY(v1Response);
+ ASSERT_TRUE(v1Response.get().IsInitialized());
+ ASSERT_EQ(v1::agent::Response::GET_STATE, v1Response.get().type());
+
+ const v1::agent::Response::GetState& getState = v1Response->get_state();
+ ASSERT_EQ(0u, getState.get_frameworks().frameworks_size());
+ ASSERT_EQ(0u, getState.get_tasks().launched_tasks_size());
+ ASSERT_EQ(0u, getState.get_executors().executors_size());
+ }
+
+ driver.launchTasks(offer.id(), {task});
+
+ AWAIT_READY(statusRunning);
+ EXPECT_EQ(TASK_RUNNING, statusRunning->state());
+
+ // GetState after task launch and check we have a running
+ // framework/task/executor.
+ {
+ v1::agent::Call v1Call;
+ v1Call.set_type(v1::agent::Call::GET_STATE);
+
+ Future<v1::agent::Response> v1Response =
+ post(slave.get()->pid, v1Call, contentType);
+
+ AWAIT_READY(v1Response);
+ ASSERT_TRUE(v1Response.get().IsInitialized());
+ ASSERT_EQ(v1::agent::Response::GET_STATE, v1Response.get().type());
+
+ const v1::agent::Response::GetState& getState = v1Response->get_state();
+ ASSERT_EQ(1u, getState.get_frameworks().frameworks_size());
+ ASSERT_EQ(0u, getState.get_frameworks().completed_frameworks_size());
+ ASSERT_EQ(1u, getState.get_tasks().launched_tasks_size());
+ ASSERT_EQ(0u, getState.get_tasks().completed_tasks_size());
+ ASSERT_EQ(1u, getState.get_executors().executors_size());
+ ASSERT_EQ(0u, getState.get_executors().completed_executors_size());
+ }
+
+ // Kill the task.
+ Future<TaskStatus> statusKilled;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusKilled));
+
+ driver.killTask(statusRunning->task_id());
+
+ AWAIT_READY(statusKilled);
+ EXPECT_EQ(TASK_KILLED, statusKilled->state());
+
+ // Make sure the executor terminated.
+ Future<Nothing> executorTerminated =
+ FUTURE_DISPATCH(_, &Slave::executorTerminated);
+
+ driver.stop();
+ driver.join();
+
+ AWAIT_READY(executorTerminated);
+
+ // Make sure `Framework::destroyExecutor()` is processed.
+ Clock::pause();
+ Clock::settle();
+
+ // After the executor terminated, we should expect a completed
+ // framework/task/executor.
+ {
+ v1::agent::Call v1Call;
+ v1Call.set_type(v1::agent::Call::GET_STATE);
+
+ Future<v1::agent::Response> v1Response =
+ post(slave.get()->pid, v1Call, contentType);
+
+ AWAIT_READY(v1Response);
+ ASSERT_TRUE(v1Response.get().IsInitialized());
+ ASSERT_EQ(v1::agent::Response::GET_STATE, v1Response.get().type());
+
+ const v1::agent::Response::GetState& getState = v1Response->get_state();
+ ASSERT_EQ(0u, getState.get_frameworks().frameworks_size());
+ ASSERT_EQ(1u, getState.get_frameworks().completed_frameworks_size());
+ ASSERT_EQ(0u, getState.get_tasks().launched_tasks_size());
+ ASSERT_EQ(1u, getState.get_tasks().completed_tasks_size());
+ ASSERT_EQ(0u, getState.get_executors().executors_size());
+ ASSERT_EQ(1u, getState.get_executors().completed_executors_size());
+ }
+}
+
} // namespace tests {
} // namespace internal {
} // namespace mesos {
[3/8] mesos git commit: Implemented 'GetTasks' call in v1 agent API.
Posted by vi...@apache.org.
Implemented 'GetTasks' call in v1 agent API.
Review: https://reviews.apache.org/r/49759/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d8e4b05b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d8e4b05b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d8e4b05b
Branch: refs/heads/master
Commit: d8e4b05bce33e9a1a459d328fc7f97c73fe4eeee
Parents: 950e9ce
Author: haosdent huang <ha...@gmail.com>
Authored: Wed Jul 13 12:10:16 2016 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Wed Jul 13 12:10:16 2016 -0700
----------------------------------------------------------------------
include/mesos/agent/agent.proto | 24 +++++
include/mesos/v1/agent/agent.proto | 24 +++++
src/slave/http.cpp | 184 ++++++++++++++++++++++++++++++++
src/slave/slave.hpp | 10 ++
src/slave/validation.cpp | 3 +
5 files changed, 245 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/d8e4b05b/include/mesos/agent/agent.proto
----------------------------------------------------------------------
diff --git a/include/mesos/agent/agent.proto b/include/mesos/agent/agent.proto
index 2d39a63..528b8b3 100644
--- a/include/mesos/agent/agent.proto
+++ b/include/mesos/agent/agent.proto
@@ -52,6 +52,7 @@ message Call {
GET_CONTAINERS = 10;
GET_FRAMEWORKS = 11; // Retrieves the information about known frameworks.
GET_EXECUTORS = 12; // Retrieves the information about known executors.
+ GET_TASKS = 13; // Retrieves the information about known tasks.
}
// Provides a snapshot of the current metrics tracked by the agent.
@@ -124,6 +125,7 @@ message Response {
GET_CONTAINERS = 9;
GET_FRAMEWORKS = 10; // See 'GetFrameworks' below.
GET_EXECUTORS = 11; // See 'GetExecutors' below.
+ GET_TASKS = 12; // See 'GetTasks' below.
}
// `healthy` would be true if the agent is healthy. Delayed responses are also
@@ -207,6 +209,27 @@ message Response {
repeated Executor completed_executors = 2;
}
+ // Lists information about all the tasks known to the agent at the current
+ // time.
+ message GetTasks {
+ // Tasks that are pending in the agent's queue before an executor is
+ // launched.
+ repeated Task pending_tasks = 1;
+
+ // Tasks that are enqueued for a launched executor that has not yet
+ // registered.
+ repeated Task queued_tasks = 2;
+
+ // Tasks that are running.
+ repeated Task launched_tasks = 3;
+
+ // Tasks that are terminated but pending updates.
+ repeated Task terminated_tasks = 4;
+
+ // Tasks that are terminated and updates acked.
+ repeated Task completed_tasks = 5;
+ }
+
optional Type type = 1;
optional GetHealth get_health = 2;
@@ -220,4 +243,5 @@ message Response {
optional GetContainers get_containers = 10;
optional GetFrameworks get_frameworks = 11;
optional GetExecutors get_executors = 12;
+ optional GetTasks get_tasks = 13;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/mesos/blob/d8e4b05b/include/mesos/v1/agent/agent.proto
----------------------------------------------------------------------
diff --git a/include/mesos/v1/agent/agent.proto b/include/mesos/v1/agent/agent.proto
index 052f942..699a17b 100644
--- a/include/mesos/v1/agent/agent.proto
+++ b/include/mesos/v1/agent/agent.proto
@@ -52,6 +52,7 @@ message Call {
GET_CONTAINERS = 10;
GET_FRAMEWORKS = 11; // Retrieves the information about known frameworks.
GET_EXECUTORS = 12; // Retrieves the information about known executors.
+ GET_TASKS = 13; // Retrieves the information about known tasks.
}
// Provides a snapshot of the current metrics tracked by the agent.
@@ -124,6 +125,7 @@ message Response {
GET_CONTAINERS = 9;
GET_FRAMEWORKS = 10; // See 'GetFrameworks' below.
GET_EXECUTORS = 11; // See 'GetExecutors' below.
+ GET_TASKS = 12; // See 'GetTasks' below.
}
// `healthy` would be true if the agent is healthy. Delayed responses are also
@@ -207,6 +209,27 @@ message Response {
repeated Executor completed_executors = 2;
}
+ // Lists information about all the tasks known to the agent at the current
+ // time.
+ message GetTasks {
+ // Tasks that are pending in the agent's queue before an executor is
+ // launched.
+ repeated Task pending_tasks = 1;
+
+ // Tasks that are enqueued for a launched executor that has not yet
+ // registered.
+ repeated Task queued_tasks = 2;
+
+ // Tasks that are running.
+ repeated Task launched_tasks = 3;
+
+ // Tasks that are terminated but pending updates.
+ repeated Task terminated_tasks = 4;
+
+ // Tasks that are terminated and updates acked.
+ repeated Task completed_tasks = 5;
+ }
+
optional Type type = 1;
optional GetHealth get_health = 2;
@@ -220,4 +243,5 @@ message Response {
optional GetContainers get_containers = 10;
optional GetFrameworks get_frameworks = 11;
optional GetExecutors get_executors = 12;
+ optional GetTasks get_tasks = 13;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/mesos/blob/d8e4b05b/src/slave/http.cpp
----------------------------------------------------------------------
diff --git a/src/slave/http.cpp b/src/slave/http.cpp
index a242e0b..63968bf 100644
--- a/src/slave/http.cpp
+++ b/src/slave/http.cpp
@@ -395,6 +395,9 @@ Future<Response> Slave::Http::api(
case agent::Call::GET_EXECUTORS:
return getExecutors(call, principal, acceptType);
+
+ case agent::Call::GET_TASKS:
+ return getTasks(call, principal, acceptType);
}
UNREACHABLE();
@@ -1227,6 +1230,187 @@ agent::Response::GetExecutors Slave::Http::_getExecutors(
}
+Future<Response> Slave::Http::getTasks(
+ const agent::Call& call,
+ const Option<string>& principal,
+ ContentType contentType) const
+{
+ CHECK_EQ(agent::Call::GET_TASKS, call.type());
+
+ // Retrieve Approvers for authorizing frameworks and tasks.
+ Future<Owned<ObjectApprover>> frameworksApprover;
+ Future<Owned<ObjectApprover>> tasksApprover;
+ Future<Owned<ObjectApprover>> executorsApprover;
+ if (slave->authorizer.isSome()) {
+ authorization::Subject subject;
+ if (principal.isSome()) {
+ subject.set_value(principal.get());
+ }
+
+ frameworksApprover = slave->authorizer.get()->getObjectApprover(
+ subject, authorization::VIEW_FRAMEWORK);
+
+ tasksApprover = slave->authorizer.get()->getObjectApprover(
+ subject, authorization::VIEW_TASK);
+
+ executorsApprover = slave->authorizer.get()->getObjectApprover(
+ subject, authorization::VIEW_EXECUTOR);
+ } else {
+ frameworksApprover = Owned<ObjectApprover>(new AcceptingObjectApprover());
+ tasksApprover = Owned<ObjectApprover>(new AcceptingObjectApprover());
+ executorsApprover = Owned<ObjectApprover>(new AcceptingObjectApprover());
+ }
+
+ return collect(frameworksApprover, tasksApprover, executorsApprover)
+ .then(defer(slave->self(),
+ [this, contentType](const tuple<Owned<ObjectApprover>,
+ Owned<ObjectApprover>,
+ Owned<ObjectApprover>>& approvers)
+ -> Future<Response> {
+ // Get approver from tuple.
+ Owned<ObjectApprover> frameworksApprover;
+ Owned<ObjectApprover> tasksApprover;
+ Owned<ObjectApprover> executorsApprover;
+ tie(frameworksApprover, tasksApprover, executorsApprover) = approvers;
+
+ agent::Response response;
+ response.set_type(agent::Response::GET_TASKS);
+
+ response.mutable_get_tasks()->CopyFrom(
+ _getTasks(frameworksApprover,
+ tasksApprover,
+ executorsApprover));
+
+ return OK(serialize(contentType, evolve(response)),
+ stringify(contentType));
+ }));
+}
+
+
+agent::Response::GetTasks Slave::Http::_getTasks(
+ const Owned<ObjectApprover>& frameworksApprover,
+ const Owned<ObjectApprover>& tasksApprover,
+ const Owned<ObjectApprover>& executorsApprover) const
+{
+ // Construct framework list with both active and completed frameworks.
+ vector<const Framework*> frameworks;
+ foreachvalue (Framework* framework, slave->frameworks) {
+ // Skip unauthorized frameworks.
+ if (!approveViewFrameworkInfo(frameworksApprover, framework->info)) {
+ continue;
+ }
+
+ frameworks.push_back(framework);
+ }
+
+ foreach (const Owned<Framework>& framework, slave->completedFrameworks) {
+ // Skip unauthorized frameworks.
+ if (!approveViewFrameworkInfo(frameworksApprover, framework->info)) {
+ continue;
+ }
+
+ frameworks.push_back(framework.get());
+ }
+
+ // Construct executor list with both active and completed executors.
+ hashmap<const Executor*, const Framework*> executors;
+ foreach (const Framework* framework, frameworks) {
+ foreachvalue (Executor* executor, framework->executors) {
+ // Skip unauthorized executors.
+ if (!approveViewExecutorInfo(executorsApprover,
+ executor->info,
+ framework->info)) {
+ continue;
+ }
+
+ executors.put(executor, framework);
+ }
+
+ foreach (const Owned<Executor>& executor, framework->completedExecutors) {
+ // Skip unauthorized executors.
+ if (!approveViewExecutorInfo(executorsApprover,
+ executor->info,
+ framework->info)) {
+ continue;
+ }
+
+ executors.put(executor.get(), framework);
+ }
+ }
+
+ agent::Response::GetTasks getTasks;
+
+ foreach (const Framework* framework, frameworks) {
+ // Pending tasks.
+ typedef hashmap<TaskID, TaskInfo> TaskMap;
+ foreachvalue (const TaskMap& taskInfos, framework->pending) {
+ foreachvalue (const TaskInfo& taskInfo, taskInfos) {
+ // Skip unauthorized tasks.
+ if (!approveViewTaskInfo(tasksApprover, taskInfo, framework->info)) {
+ continue;
+ }
+
+ const Task& task =
+ protobuf::createTask(taskInfo, TASK_STAGING, framework->id());
+
+ getTasks.add_pending_tasks()->CopyFrom(task);
+ }
+ }
+ }
+
+ foreachpair (const Executor* executor,
+ const Framework* framework,
+ executors) {
+ // Queued tasks.
+ foreach (const TaskInfo& taskInfo, executor->queuedTasks.values()) {
+ // Skip unauthorized tasks.
+ if (!approveViewTaskInfo(tasksApprover, taskInfo, framework->info)) {
+ continue;
+ }
+
+ const Task& task =
+ protobuf::createTask(taskInfo, TASK_STAGING, framework->id());
+
+ getTasks.add_queued_tasks()->CopyFrom(task);
+ }
+
+ // Launched tasks.
+ foreach (Task* task, executor->launchedTasks.values()) {
+ CHECK_NOTNULL(task);
+ // Skip unauthorized tasks.
+ if (!approveViewTask(tasksApprover, *task, framework->info)) {
+ continue;
+ }
+
+ getTasks.add_launched_tasks()->CopyFrom(*task);
+ }
+
+ // Terminated tasks.
+ foreach (Task* task, executor->terminatedTasks.values()) {
+ CHECK_NOTNULL(task);
+ // Skip unauthorized tasks.
+ if (!approveViewTask(tasksApprover, *task, framework->info)) {
+ continue;
+ }
+
+ getTasks.add_terminated_tasks()->CopyFrom(*task);
+ }
+
+ // Completed tasks.
+ foreach (const std::shared_ptr<Task>& task, executor->completedTasks) {
+ // Skip unauthorized tasks.
+ if (!approveViewTask(tasksApprover, *task.get(), framework->info)) {
+ continue;
+ }
+
+ getTasks.add_completed_tasks()->CopyFrom(*task);
+ }
+ }
+
+ return getTasks;
+}
+
+
string Slave::Http::STATISTICS_HELP()
{
return HELP(
http://git-wip-us.apache.org/repos/asf/mesos/blob/d8e4b05b/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 48cf77d..4995c84 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -566,6 +566,16 @@ private:
const process::Owned<ObjectApprover>& frameworksApprover,
const process::Owned<ObjectApprover>& executorsApprover) const;
+ process::Future<process::http::Response> getTasks(
+ const mesos::agent::Call& call,
+ const Option<std::string>& principal,
+ ContentType contentType) const;
+
+ mesos::agent::Response::GetTasks _getTasks(
+ const process::Owned<ObjectApprover>& frameworksApprover,
+ const process::Owned<ObjectApprover>& tasksApprover,
+ const process::Owned<ObjectApprover>& executorsApprover) const;
+
Slave* slave;
// Used to rate limit the statistics endpoint.
http://git-wip-us.apache.org/repos/asf/mesos/blob/d8e4b05b/src/slave/validation.cpp
----------------------------------------------------------------------
diff --git a/src/slave/validation.cpp b/src/slave/validation.cpp
index 717169a..a9f3182 100644
--- a/src/slave/validation.cpp
+++ b/src/slave/validation.cpp
@@ -96,6 +96,9 @@ Option<Error> validate(
case mesos::agent::Call::GET_EXECUTORS:
return None();
+
+ case mesos::agent::Call::GET_TASKS:
+ return None();
}
UNREACHABLE();
[2/8] mesos git commit: Implemented 'GetExecutors' call in v1 agent
API.
Posted by vi...@apache.org.
Implemented 'GetExecutors' call in v1 agent API.
Review: https://reviews.apache.org/r/49758/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/950e9ce9
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/950e9ce9
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/950e9ce9
Branch: refs/heads/master
Commit: 950e9ce90f08f2f3c946fed8dd0bdfe1d1d75c29
Parents: e71233c
Author: haosdent huang <ha...@gmail.com>
Authored: Wed Jul 13 12:10:11 2016 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Wed Jul 13 12:10:11 2016 -0700
----------------------------------------------------------------------
include/mesos/agent/agent.proto | 14 +++++
include/mesos/v1/agent/agent.proto | 14 +++++
src/slave/http.cpp | 107 ++++++++++++++++++++++++++++++++
src/slave/slave.hpp | 9 +++
src/slave/validation.cpp | 3 +
5 files changed, 147 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/950e9ce9/include/mesos/agent/agent.proto
----------------------------------------------------------------------
diff --git a/include/mesos/agent/agent.proto b/include/mesos/agent/agent.proto
index 7acb2d4..2d39a63 100644
--- a/include/mesos/agent/agent.proto
+++ b/include/mesos/agent/agent.proto
@@ -51,6 +51,7 @@ message Call {
GET_CONTAINERS = 10;
GET_FRAMEWORKS = 11; // Retrieves the information about known frameworks.
+ GET_EXECUTORS = 12; // Retrieves the information about known executors.
}
// Provides a snapshot of the current metrics tracked by the agent.
@@ -122,6 +123,7 @@ message Response {
GET_CONTAINERS = 9;
GET_FRAMEWORKS = 10; // See 'GetFrameworks' below.
+ GET_EXECUTORS = 11; // See 'GetExecutors' below.
}
// `healthy` would be true if the agent is healthy. Delayed responses are also
@@ -194,6 +196,17 @@ message Response {
repeated Framework completed_frameworks = 2;
}
+ // Lists information about all the executors known to the agent at the
+ // current time.
+ message GetExecutors {
+ message Executor {
+ required ExecutorInfo executor_info = 1;
+ }
+
+ repeated Executor executors = 1;
+ repeated Executor completed_executors = 2;
+ }
+
optional Type type = 1;
optional GetHealth get_health = 2;
@@ -206,4 +219,5 @@ message Response {
optional GetState get_state = 9;
optional GetContainers get_containers = 10;
optional GetFrameworks get_frameworks = 11;
+ optional GetExecutors get_executors = 12;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/mesos/blob/950e9ce9/include/mesos/v1/agent/agent.proto
----------------------------------------------------------------------
diff --git a/include/mesos/v1/agent/agent.proto b/include/mesos/v1/agent/agent.proto
index 878e499..052f942 100644
--- a/include/mesos/v1/agent/agent.proto
+++ b/include/mesos/v1/agent/agent.proto
@@ -51,6 +51,7 @@ message Call {
GET_CONTAINERS = 10;
GET_FRAMEWORKS = 11; // Retrieves the information about known frameworks.
+ GET_EXECUTORS = 12; // Retrieves the information about known executors.
}
// Provides a snapshot of the current metrics tracked by the agent.
@@ -122,6 +123,7 @@ message Response {
GET_CONTAINERS = 9;
GET_FRAMEWORKS = 10; // See 'GetFrameworks' below.
+ GET_EXECUTORS = 11; // See 'GetExecutors' below.
}
// `healthy` would be true if the agent is healthy. Delayed responses are also
@@ -194,6 +196,17 @@ message Response {
repeated Framework completed_frameworks = 2;
}
+ // Lists information about all the executors known to the agent at the
+ // current time.
+ message GetExecutors {
+ message Executor {
+ required ExecutorInfo executor_info = 1;
+ }
+
+ repeated Executor executors = 1;
+ repeated Executor completed_executors = 2;
+ }
+
optional Type type = 1;
optional GetHealth get_health = 2;
@@ -206,4 +219,5 @@ message Response {
optional GetState get_state = 9;
optional GetContainers get_containers = 10;
optional GetFrameworks get_frameworks = 11;
+ optional GetExecutors get_executors = 12;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/mesos/blob/950e9ce9/src/slave/http.cpp
----------------------------------------------------------------------
diff --git a/src/slave/http.cpp b/src/slave/http.cpp
index 7132e7a..a242e0b 100644
--- a/src/slave/http.cpp
+++ b/src/slave/http.cpp
@@ -392,6 +392,9 @@ Future<Response> Slave::Http::api(
case agent::Call::GET_FRAMEWORKS:
return getFrameworks(call, principal, acceptType);
+
+ case agent::Call::GET_EXECUTORS:
+ return getExecutors(call, principal, acceptType);
}
UNREACHABLE();
@@ -1120,6 +1123,110 @@ agent::Response::GetFrameworks Slave::Http::_getFrameworks(
}
+Future<Response> Slave::Http::getExecutors(
+ const agent::Call& call,
+ const Option<string>& principal,
+ ContentType contentType) const
+{
+ CHECK_EQ(agent::Call::GET_EXECUTORS, call.type());
+
+ // Retrieve `ObjectApprover`s for authorizing frameworks and executors.
+ Future<Owned<ObjectApprover>> frameworksApprover;
+ Future<Owned<ObjectApprover>> executorsApprover;
+ if (slave->authorizer.isSome()) {
+ authorization::Subject subject;
+ if (principal.isSome()) {
+ subject.set_value(principal.get());
+ }
+
+ frameworksApprover = slave->authorizer.get()->getObjectApprover(
+ subject, authorization::VIEW_FRAMEWORK);
+
+ executorsApprover = slave->authorizer.get()->getObjectApprover(
+ subject, authorization::VIEW_EXECUTOR);
+ } else {
+ frameworksApprover = Owned<ObjectApprover>(new AcceptingObjectApprover());
+ executorsApprover = Owned<ObjectApprover>(new AcceptingObjectApprover());
+ }
+
+ return collect(frameworksApprover, executorsApprover)
+ .then(defer(slave->self(),
+ [this, contentType](const tuple<Owned<ObjectApprover>,
+ Owned<ObjectApprover>>& approvers)
+ -> Future<Response> {
+ // Get approver from tuple.
+ Owned<ObjectApprover> frameworksApprover;
+ Owned<ObjectApprover> executorsApprover;
+ tie(frameworksApprover, executorsApprover) = approvers;
+
+ agent::Response response;
+ response.set_type(agent::Response::GET_EXECUTORS);
+
+ response.mutable_get_executors()->CopyFrom(
+ _getExecutors(frameworksApprover, executorsApprover));
+
+ return OK(serialize(contentType, evolve(response)),
+ stringify(contentType));
+ }));
+}
+
+
+agent::Response::GetExecutors Slave::Http::_getExecutors(
+ const Owned<ObjectApprover>& frameworksApprover,
+ const Owned<ObjectApprover>& executorsApprover) const
+{
+ // Construct framework list with both active and completed frameworks.
+ vector<const Framework*> frameworks;
+ foreachvalue (Framework* framework, slave->frameworks) {
+ // Skip unauthorized frameworks.
+ if (!approveViewFrameworkInfo(frameworksApprover, framework->info)) {
+ continue;
+ }
+
+ frameworks.push_back(framework);
+ }
+
+ foreach (const Owned<Framework>& framework, slave->completedFrameworks) {
+ // Skip unauthorized frameworks.
+ if (!approveViewFrameworkInfo(frameworksApprover, framework->info)) {
+ continue;
+ }
+
+ frameworks.push_back(framework.get());
+ }
+
+ agent::Response::GetExecutors getExecutors;
+
+ foreach (const Framework* framework, frameworks) {
+ foreachvalue (Executor* executor, framework->executors) {
+ // Skip unauthorized executors.
+ if (!approveViewExecutorInfo(executorsApprover,
+ executor->info,
+ framework->info)) {
+ continue;
+ }
+
+ getExecutors.add_executors()->mutable_executor_info()->CopyFrom(
+ executor->info);
+ }
+
+ foreach (const Owned<Executor>& executor, framework->completedExecutors) {
+ // Skip unauthorized executors.
+ if (!approveViewExecutorInfo(executorsApprover,
+ executor->info,
+ framework->info)) {
+ continue;
+ }
+
+ getExecutors.add_completed_executors()->mutable_executor_info()->CopyFrom(
+ executor->info);
+ }
+ }
+
+ return getExecutors;
+}
+
+
string Slave::Http::STATISTICS_HELP()
{
return HELP(
http://git-wip-us.apache.org/repos/asf/mesos/blob/950e9ce9/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 8e1921c..48cf77d 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -557,6 +557,15 @@ private:
mesos::agent::Response::GetFrameworks _getFrameworks(
const process::Owned<ObjectApprover>& frameworksApprover) const;
+ process::Future<process::http::Response> getExecutors(
+ const mesos::agent::Call& call,
+ const Option<std::string>& principal,
+ ContentType contentType) const;
+
+ mesos::agent::Response::GetExecutors _getExecutors(
+ const process::Owned<ObjectApprover>& frameworksApprover,
+ const process::Owned<ObjectApprover>& executorsApprover) const;
+
Slave* slave;
// Used to rate limit the statistics endpoint.
http://git-wip-us.apache.org/repos/asf/mesos/blob/950e9ce9/src/slave/validation.cpp
----------------------------------------------------------------------
diff --git a/src/slave/validation.cpp b/src/slave/validation.cpp
index c5720f3..717169a 100644
--- a/src/slave/validation.cpp
+++ b/src/slave/validation.cpp
@@ -93,6 +93,9 @@ Option<Error> validate(
case mesos::agent::Call::GET_FRAMEWORKS:
return None();
+
+ case mesos::agent::Call::GET_EXECUTORS:
+ return None();
}
UNREACHABLE();
[5/8] mesos git commit: Added test case for 'GetFrameworks' call in
v1 agent API.
Posted by vi...@apache.org.
Added test case for 'GetFrameworks' call in v1 agent API.
Review: https://reviews.apache.org/r/49797/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4990e7c4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4990e7c4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4990e7c4
Branch: refs/heads/master
Commit: 4990e7c496583d46d8355c2bcd3934c968c31015
Parents: a298179
Author: haosdent huang <ha...@gmail.com>
Authored: Wed Jul 13 12:10:28 2016 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Wed Jul 13 12:10:28 2016 -0700
----------------------------------------------------------------------
src/tests/api_tests.cpp | 103 +++++++++++++++++++++++++++++++++++++++++++
1 file changed, 103 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/4990e7c4/src/tests/api_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp
index 55e825e..40f5077 100644
--- a/src/tests/api_tests.cpp
+++ b/src/tests/api_tests.cpp
@@ -2659,6 +2659,109 @@ TEST_P(AgentAPITest, ReadFileInvalidPath)
AWAIT_EXPECT_RESPONSE_STATUS_EQ(NotFound().status, response);
}
+
+TEST_P(AgentAPITest, GetFrameworks)
+{
+ Try<Owned<cluster::Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+ Try<Owned<cluster::Slave>> slave = StartSlave(detector.get());
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(_, _, _));
+ Future<vector<Offer>> offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0u, offers.get().size());
+ const Offer& offer = offers.get()[0];
+
+ TaskInfo task;
+ task.set_name("");
+ task.mutable_task_id()->set_value("1");
+ task.mutable_slave_id()->MergeFrom(offer.slave_id());
+ task.mutable_resources()->MergeFrom(offer.resources());
+
+ CommandInfo command;
+ command.set_value("sleep 1000");
+ task.mutable_command()->MergeFrom(command);
+
+ Future<TaskStatus> statusRunning;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusRunning));
+
+ ContentType contentType = GetParam();
+
+ // No tasks launched, we should expect zero frameworks in Response.
+ {
+ v1::agent::Call v1Call;
+ v1Call.set_type(v1::agent::Call::GET_FRAMEWORKS);
+
+ Future<v1::agent::Response> v1Response =
+ post(slave.get()->pid, v1Call, contentType);
+
+ AWAIT_READY(v1Response);
+ ASSERT_TRUE(v1Response.get().IsInitialized());
+ ASSERT_EQ(v1::agent::Response::GET_FRAMEWORKS, v1Response.get().type());
+ ASSERT_EQ(0, v1Response.get().get_frameworks().frameworks_size());
+ ASSERT_EQ(0, v1Response.get().get_frameworks().completed_frameworks_size());
+ }
+
+ driver.launchTasks(offer.id(), {task});
+
+ AWAIT_READY(statusRunning);
+ EXPECT_EQ(TASK_RUNNING, statusRunning->state());
+
+ // A task launched, we expect one framework in Response.
+ {
+ v1::agent::Call v1Call;
+ v1Call.set_type(v1::agent::Call::GET_FRAMEWORKS);
+
+ Future<v1::agent::Response> v1Response =
+ post(slave.get()->pid, v1Call, contentType);
+
+ AWAIT_READY(v1Response);
+ ASSERT_TRUE(v1Response.get().IsInitialized());
+ ASSERT_EQ(v1::agent::Response::GET_FRAMEWORKS, v1Response.get().type());
+ ASSERT_EQ(1, v1Response.get().get_frameworks().frameworks_size());
+ ASSERT_EQ(0, v1Response.get().get_frameworks().completed_frameworks_size());
+ }
+
+ // Make sure the executor terminated.
+ Future<Nothing> executorTerminated =
+ FUTURE_DISPATCH(_, &Slave::executorTerminated);
+
+ driver.stop();
+ driver.join();
+
+ AWAIT_READY(executorTerminated);
+
+ // After the executor terminated, we should expect one completed framework in
+ // Response.
+ {
+ v1::agent::Call v1Call;
+ v1Call.set_type(v1::agent::Call::GET_FRAMEWORKS);
+
+ Future<v1::agent::Response> v1Response =
+ post(slave.get()->pid, v1Call, contentType);
+
+ AWAIT_READY(v1Response);
+ ASSERT_TRUE(v1Response.get().IsInitialized());
+ ASSERT_EQ(v1::agent::Response::GET_FRAMEWORKS, v1Response.get().type());
+ ASSERT_EQ(0, v1Response.get().get_frameworks().frameworks_size());
+ ASSERT_EQ(1, v1Response.get().get_frameworks().completed_frameworks_size());
+ }
+}
+
} // namespace tests {
} // namespace internal {
} // namespace mesos {
[4/8] mesos git commit: Implemented 'GetState' call in v1 agent API.
Posted by vi...@apache.org.
Implemented 'GetState' call in v1 agent API.
Review: https://reviews.apache.org/r/49760/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a2981795
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a2981795
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a2981795
Branch: refs/heads/master
Commit: a298179585c727255c6637d77229d251fb8dd1af
Parents: d8e4b05
Author: haosdent huang <ha...@gmail.com>
Authored: Wed Jul 13 12:10:22 2016 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Wed Jul 13 12:10:22 2016 -0700
----------------------------------------------------------------------
include/mesos/agent/agent.proto | 6 ++-
include/mesos/v1/agent/agent.proto | 6 ++-
src/slave/http.cpp | 78 ++++++++++++++++++++++++++++++++-
src/slave/slave.hpp | 10 +++++
4 files changed, 97 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/a2981795/include/mesos/agent/agent.proto
----------------------------------------------------------------------
diff --git a/include/mesos/agent/agent.proto b/include/mesos/agent/agent.proto
index 528b8b3..5b91677 100644
--- a/include/mesos/agent/agent.proto
+++ b/include/mesos/agent/agent.proto
@@ -167,8 +167,12 @@ message Response {
required bytes data = 2;
}
+ // Contains full state of the agent i.e. information about the tasks,
+ // frameworks and executors running in the cluster.
message GetState {
- // TODO(vinod): Fill in the fields.
+ optional GetTasks get_tasks = 1;
+ optional GetExecutors get_executors = 2;
+ optional GetFrameworks get_frameworks = 3;
}
// Information about containers running on this agent. It contains
http://git-wip-us.apache.org/repos/asf/mesos/blob/a2981795/include/mesos/v1/agent/agent.proto
----------------------------------------------------------------------
diff --git a/include/mesos/v1/agent/agent.proto b/include/mesos/v1/agent/agent.proto
index 699a17b..8145669 100644
--- a/include/mesos/v1/agent/agent.proto
+++ b/include/mesos/v1/agent/agent.proto
@@ -167,8 +167,12 @@ message Response {
required bytes data = 2;
}
+ // Contains full state of the agent i.e. information about the tasks,
+ // frameworks and executors running in the cluster.
message GetState {
- // TODO(vinod): Fill in the fields.
+ optional GetTasks get_tasks = 1;
+ optional GetExecutors get_executors = 2;
+ optional GetFrameworks get_frameworks = 3;
}
// Information about containers running on this agent. It contains
http://git-wip-us.apache.org/repos/asf/mesos/blob/a2981795/src/slave/http.cpp
----------------------------------------------------------------------
diff --git a/src/slave/http.cpp b/src/slave/http.cpp
index 63968bf..2cbf3c5 100644
--- a/src/slave/http.cpp
+++ b/src/slave/http.cpp
@@ -385,7 +385,7 @@ Future<Response> Slave::Http::api(
return readFile(call, principal, acceptType);
case agent::Call::GET_STATE:
- return NotImplemented();
+ return getState(call, principal, acceptType);
case agent::Call::GET_CONTAINERS:
return getContainers(call, principal, acceptType);
@@ -1411,6 +1411,82 @@ agent::Response::GetTasks Slave::Http::_getTasks(
}
+Future<Response> Slave::Http::getState(
+ const agent::Call& call,
+ const Option<string>& principal,
+ ContentType contentType) const
+{
+ CHECK_EQ(agent::Call::GET_STATE, call.type());
+
+ // Retrieve Approvers for authorizing frameworks and tasks.
+ Future<Owned<ObjectApprover>> frameworksApprover;
+ Future<Owned<ObjectApprover>> tasksApprover;
+ Future<Owned<ObjectApprover>> executorsApprover;
+ if (slave->authorizer.isSome()) {
+ authorization::Subject subject;
+ if (principal.isSome()) {
+ subject.set_value(principal.get());
+ }
+
+ frameworksApprover = slave->authorizer.get()->getObjectApprover(
+ subject, authorization::VIEW_FRAMEWORK);
+
+ tasksApprover = slave->authorizer.get()->getObjectApprover(
+ subject, authorization::VIEW_TASK);
+
+ executorsApprover = slave->authorizer.get()->getObjectApprover(
+ subject, authorization::VIEW_EXECUTOR);
+ } else {
+ frameworksApprover = Owned<ObjectApprover>(new AcceptingObjectApprover());
+ tasksApprover = Owned<ObjectApprover>(new AcceptingObjectApprover());
+ executorsApprover = Owned<ObjectApprover>(new AcceptingObjectApprover());
+ }
+
+ return collect(frameworksApprover, tasksApprover, executorsApprover)
+ .then(defer(slave->self(),
+ [=](const tuple<Owned<ObjectApprover>,
+ Owned<ObjectApprover>,
+ Owned<ObjectApprover>>& approvers)
+ -> Future<Response> {
+ // Get approver from tuple.
+ Owned<ObjectApprover> frameworksApprover;
+ Owned<ObjectApprover> tasksApprover;
+ Owned<ObjectApprover> executorsApprover;
+ tie(frameworksApprover, tasksApprover, executorsApprover) = approvers;
+
+ agent::Response response;
+ response.set_type(agent::Response::GET_STATE);
+ response.mutable_get_state()->CopyFrom(
+ _getState(frameworksApprover,
+ tasksApprover,
+ executorsApprover));
+
+ return OK(serialize(contentType, evolve(response)),
+ stringify(contentType));
+ }));
+}
+
+
+agent::Response::GetState Slave::Http::_getState(
+ const Owned<ObjectApprover>& frameworksApprover,
+ const Owned<ObjectApprover>& tasksApprover,
+ const Owned<ObjectApprover>& executorsApprover) const
+{
+ agent::Response::GetState getState;
+
+ getState.mutable_get_tasks()->CopyFrom(
+ _getTasks(frameworksApprover, tasksApprover, executorsApprover));
+
+ getState.mutable_get_executors()->CopyFrom(
+ _getExecutors(frameworksApprover, executorsApprover));
+
+ getState.mutable_get_frameworks()->CopyFrom(
+ _getFrameworks(frameworksApprover));
+
+ return getState;
+}
+
+
string Slave::Http::STATISTICS_HELP()
{
return HELP(
http://git-wip-us.apache.org/repos/asf/mesos/blob/a2981795/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 4995c84..9864cf4 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -576,6 +576,16 @@ private:
const process::Owned<ObjectApprover>& tasksApprover,
const process::Owned<ObjectApprover>& executorsApprover) const;
+ process::Future<process::http::Response> getState(
+ const mesos::agent::Call& call,
+ const Option<std::string>& principal,
+ ContentType contentType) const;
+
+ mesos::agent::Response::GetState _getState(
+ const process::Owned<ObjectApprover>& frameworksApprover,
+ const process::Owned<ObjectApprover>& taskApprover,
+ const process::Owned<ObjectApprover>& executorsApprover) const;
+
Slave* slave;
// Used to rate limit the statistics endpoint.