You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2014/06/16 23:03:29 UTC
git commit: MESOS-1312: Added orphan_tasks and
unregistered_frameworks in master's state.json.
Repository: mesos
Updated Branches:
refs/heads/master a8c8b8963 -> cf6b541c9
MESOS-1312: Added orphan_tasks and unregistered_frameworks in
master's state.json.
Added orphan_tasks and unregistered_frameworks in state.json to display
tasks that are currently running on the slaves, but do not belong to
any already (re-)registered frameworks.
Review: https://reviews.apache.org/r/22223
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/cf6b541c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/cf6b541c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/cf6b541c
Branch: refs/heads/master
Commit: cf6b541c907b1fa35cbf83532e47dde73a1e5500
Parents: a8c8b89
Author: Yifan Gu <yi...@mesosphere.io>
Authored: Mon Jun 16 14:01:55 2014 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Mon Jun 16 14:03:08 2014 -0700
----------------------------------------------------------------------
src/master/http.cpp | 38 +++++++++
src/tests/master_tests.cpp | 169 ++++++++++++++++++++++++++++++++++++++++
2 files changed, 207 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/cf6b541c/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index b565dc6..5d86976 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -540,6 +540,44 @@ Future<Response> Master::Http::state(const Request& request)
object.values["completed_frameworks"] = array;
}
+ // Model all of the orphan tasks.
+ {
+ JSON::Array array;
+
+ // Find those orphan tasks.
+ foreachvalue (const Slave* slave, master.slaves.activated) {
+ typedef hashmap<TaskID, Task*> TaskMap;
+ foreachvalue (const TaskMap& tasks, slave->tasks) {
+ foreachvalue (const Task* task, tasks) {
+ CHECK_NOTNULL(task);
+ if (!master.frameworks.activated.contains(task->framework_id())) {
+ array.values.push_back(model(*task));
+ }
+ }
+ }
+ }
+
+ object.values["orphan_tasks"] = array;
+ }
+
+ // Model all currently unregistered frameworks.
+ // This could happen when the framework has yet to re-register
+ // after master failover.
+ {
+ JSON::Array array;
+
+ // Find unregistered frameworks.
+ foreachvalue (const Slave* slave, master.slaves.activated) {
+ foreachkey (const FrameworkID& frameworkId, slave->tasks) {
+ if (!master.frameworks.activated.contains(frameworkId)) {
+ array.values.push_back(frameworkId.value());
+ }
+ }
+ }
+
+ object.values["unregistered_frameworks"] = array;
+ }
+
return OK(object, request.query.get("jsonp"));
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/cf6b541c/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index 34df121..edcaa75 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -82,6 +82,7 @@ using testing::AtMost;
using testing::DoAll;
using testing::Eq;
using testing::Return;
+using testing::SaveArg;
// Those of the overall Mesos master/slave/scheduler/driver tests
// that seem vaguely more master than slave-related are in this file.
@@ -1919,3 +1920,171 @@ TEST_F(MasterZooKeeperTest, LostZooKeeperCluster)
}
#endif // MESOS_HAS_JAVA
+
+
+// This test ensures that when a master fails over, those tasks that
+// belong to some currently unregistered frameworks will appear in the
+// "orphan_tasks" field in the state.json. And those unregistered frameworks
+// will appear in the "unregistered_frameworks" field.
+TEST_F(MasterTest, OrphanTasks)
+{
+ // Start a master.
+ Try<PID<Master> > master = StartMaster();
+ ASSERT_SOME(master);
+
+ MockExecutor exec(DEFAULT_EXECUTOR_ID);
+ StandaloneMasterDetector detector (master.get());
+
+ // Start a slave.
+ Try<PID<Slave> > slave = StartSlave(&exec, &detector);
+ ASSERT_SOME(slave);
+
+ // Create a task on the slave.
+ MockScheduler sched;
+ TestingMesosSchedulerDriver driver(&sched, &detector);
+
+ FrameworkID frameworkId;
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .WillOnce(SaveArg<1>(&frameworkId))
+ .WillRepeatedly(Return()); // Ignore subsequent events.
+
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 1, 64, "*"))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ EXPECT_CALL(exec, registered(_, _, _, _))
+ .Times(1);
+
+ EXPECT_CALL(exec, launchTask(_, _))
+ .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+ Future<TaskStatus> status;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status))
+ .WillRepeatedly(Return()); // Ignore subsequent updates.
+
+ driver.start();
+
+ AWAIT_READY(status);
+ EXPECT_EQ(TASK_RUNNING, status.get().state());
+
+ // Get the master's state.
+ Future<process::http::Response> response =
+ process::http::get(master.get(), "state.json");
+ AWAIT_READY(response);
+
+ EXPECT_SOME_EQ(
+ "application/json",
+ response.get().headers.get("Content-Type"));
+
+ Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
+ ASSERT_SOME(parse);
+
+ JSON::Object state = parse.get();
+ // Record the original framework and task info.
+ JSON::Array frameworks =
+ state.values["frameworks"].as<JSON::Array>();
+ JSON::Object activeFramework =
+ frameworks.values.front().as<JSON::Object>();
+ JSON::String activeFrameworkId =
+ activeFramework.values["id"].as<JSON::String>();
+ JSON::Array activeTasks =
+ activeFramework.values["tasks"].as<JSON::Array>();
+ JSON::Array orphanTasks =
+ state.values["orphan_tasks"].as<JSON::Array>();
+ JSON::Array unknownFrameworksArray =
+ state.values["unregistered_frameworks"].as<JSON::Array>();
+
+ EXPECT_EQ(1u, frameworks.values.size());
+ EXPECT_EQ(1u, activeTasks.values.size());
+ EXPECT_EQ(0u, orphanTasks.values.size());
+ EXPECT_EQ(0u, unknownFrameworksArray.values.size());
+ EXPECT_EQ(frameworkId.value(), activeFrameworkId.value);
+
+ EXPECT_CALL(sched, disconnected(&driver))
+ .Times(1);
+
+ // Stop the master.
+ Stop(master.get());
+
+ Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+ FUTURE_PROTOBUF(SlaveReregisteredMessage(), master.get(), _);
+
+ // Drop the reregisterFrameworkMessage to delay the framework
+ // from re-registration.
+ Future<ReregisterFrameworkMessage> reregisterFrameworkMessage =
+ DROP_PROTOBUF(ReregisterFrameworkMessage(), _, master.get());
+
+ Future<FrameworkRegisteredMessage> frameworkRegisteredMessage =
+ FUTURE_PROTOBUF(FrameworkRegisteredMessage(), master.get(), _);
+
+ Clock::pause();
+
+ // The master failover.
+ master = StartMaster();
+ ASSERT_SOME(master);
+
+ // Simulate a new master detected event to the slave and the framework.
+ detector.appoint(master.get());
+
+ AWAIT_READY(slaveReregisteredMessage);
+ AWAIT_READY(reregisterFrameworkMessage);
+
+ // Get the master's state.
+ response = process::http::get(master.get(), "state.json");
+ AWAIT_READY(response);
+
+ EXPECT_SOME_EQ(
+ "application/json",
+ response.get().headers.get("Content-Type"));
+
+ parse = JSON::parse<JSON::Object>(response.get().body);
+ ASSERT_SOME(parse);
+
+ // Verify that we have some orphan tasks and unregistered
+ // frameworks.
+ state = parse.get();
+ orphanTasks = state.values["orphan_tasks"].as<JSON::Array>();
+ EXPECT_EQ(activeTasks, orphanTasks);
+
+ unknownFrameworksArray =
+ state.values["unregistered_frameworks"].as<JSON::Array>();
+ EXPECT_EQ(1u, unknownFrameworksArray.values.size());
+
+ JSON::String unknownFrameworkId =
+ unknownFrameworksArray.values.front().as<JSON::String>();
+ EXPECT_EQ(activeFrameworkId, unknownFrameworkId);
+
+ // Advance the clock to let the framework re-register with the master.
+ Clock::advance(Seconds(1));
+ Clock::settle();
+ Clock::resume();
+
+ AWAIT_READY(frameworkRegisteredMessage);
+
+ // Get the master's state.
+ response = process::http::get(master.get(), "state.json");
+ AWAIT_READY(response);
+
+ EXPECT_SOME_EQ(
+ "application/json",
+ response.get().headers.get("Content-Type"));
+
+ parse = JSON::parse<JSON::Object>(response.get().body);
+ ASSERT_SOME(parse);
+
+ // Verify the orphan tasks and unregistered frameworks are removed.
+ state = parse.get();
+ unknownFrameworksArray =
+ state.values["unregistered_frameworks"].as<JSON::Array>();
+ EXPECT_EQ(0u, unknownFrameworksArray.values.size());
+
+ orphanTasks = state.values["orphan_tasks"].as<JSON::Array>();
+ EXPECT_EQ(0u, orphanTasks.values.size());
+
+ // Cleanup.
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+}