You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2014/06/16 23:03:29 UTC

git commit: MESOS-1312: Added orphan_tasks and unregistered_frameworks in master's state.json.

Repository: mesos
Updated Branches:
  refs/heads/master a8c8b8963 -> cf6b541c9


MESOS-1312: Added orphan_tasks and unregistered_frameworks in
master's state.json.

Added orphan_tasks and unregistered_frameworks in state.json to display
tasks that are currently running on the slaves, but do not belong to
any already (re-)registered frameworks.

Review: https://reviews.apache.org/r/22223


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/cf6b541c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/cf6b541c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/cf6b541c

Branch: refs/heads/master
Commit: cf6b541c907b1fa35cbf83532e47dde73a1e5500
Parents: a8c8b89
Author: Yifan Gu <yi...@mesosphere.io>
Authored: Mon Jun 16 14:01:55 2014 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Mon Jun 16 14:03:08 2014 -0700

----------------------------------------------------------------------
 src/master/http.cpp        |  38 +++++++++
 src/tests/master_tests.cpp | 169 ++++++++++++++++++++++++++++++++++++++++
 2 files changed, 207 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/cf6b541c/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index b565dc6..5d86976 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -540,6 +540,44 @@ Future<Response> Master::Http::state(const Request& request)
     object.values["completed_frameworks"] = array;
   }
 
+  // Model all of the orphan tasks.
+  {
+    JSON::Array array;
+
+    // Find those orphan tasks.
+    foreachvalue (const Slave* slave, master.slaves.activated) {
+      typedef hashmap<TaskID, Task*> TaskMap;
+      foreachvalue (const TaskMap& tasks, slave->tasks) {
+        foreachvalue (const Task* task, tasks) {
+          CHECK_NOTNULL(task);
+          if (!master.frameworks.activated.contains(task->framework_id())) {
+            array.values.push_back(model(*task));
+          }
+        }
+      }
+    }
+
+    object.values["orphan_tasks"] = array;
+  }
+
+  // Model all currently unregistered frameworks.
+  // This could happen when the framework has yet to re-register
+  // after master failover.
+  {
+    JSON::Array array;
+
+    // Find unregistered frameworks.
+    foreachvalue (const Slave* slave, master.slaves.activated) {
+      foreachkey (const FrameworkID& frameworkId, slave->tasks) {
+        if (!master.frameworks.activated.contains(frameworkId)) {
+          array.values.push_back(frameworkId.value());
+        }
+      }
+    }
+
+    object.values["unregistered_frameworks"] = array;
+  }
+
   return OK(object, request.query.get("jsonp"));
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/cf6b541c/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index 34df121..edcaa75 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -82,6 +82,7 @@ using testing::AtMost;
 using testing::DoAll;
 using testing::Eq;
 using testing::Return;
+using testing::SaveArg;
 
 // Those of the overall Mesos master/slave/scheduler/driver tests
 // that seem vaguely more master than slave-related are in this file.
@@ -1919,3 +1920,171 @@ TEST_F(MasterZooKeeperTest, LostZooKeeperCluster)
 }
 
 #endif // MESOS_HAS_JAVA
+
+
+// This test ensures that when a master fails over, those tasks that
+// belong to some currently unregistered frameworks will appear in the
+// "orphan_tasks" field in the state.json. And those unregistered frameworks
+// will appear in the "unregistered_frameworks" field.
+TEST_F(MasterTest, OrphanTasks)
+{
+  // Start a master.
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+  StandaloneMasterDetector detector (master.get());
+
+  // Start a slave.
+  Try<PID<Slave> > slave = StartSlave(&exec, &detector);
+  ASSERT_SOME(slave);
+
+  // Create a task on the slave.
+  MockScheduler sched;
+  TestingMesosSchedulerDriver driver(&sched, &detector);
+
+  FrameworkID frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(SaveArg<1>(&frameworkId))
+    .WillRepeatedly(Return()); // Ignore subsequent events.
+
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 1, 64, "*"))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  EXPECT_CALL(exec, registered(_, _, _, _))
+    .Times(1);
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status))
+    .WillRepeatedly(Return()); // Ignore subsequent updates.
+
+  driver.start();
+
+  AWAIT_READY(status);
+  EXPECT_EQ(TASK_RUNNING, status.get().state());
+
+  // Get the master's state.
+  Future<process::http::Response> response =
+    process::http::get(master.get(), "state.json");
+  AWAIT_READY(response);
+
+  EXPECT_SOME_EQ(
+      "application/json",
+      response.get().headers.get("Content-Type"));
+
+  Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
+  ASSERT_SOME(parse);
+
+  JSON::Object state = parse.get();
+  // Record the original framework and task info.
+  JSON::Array frameworks =
+    state.values["frameworks"].as<JSON::Array>();
+  JSON::Object activeFramework =
+    frameworks.values.front().as<JSON::Object>();
+  JSON::String activeFrameworkId =
+    activeFramework.values["id"].as<JSON::String>();
+  JSON::Array activeTasks =
+    activeFramework.values["tasks"].as<JSON::Array>();
+  JSON::Array orphanTasks =
+    state.values["orphan_tasks"].as<JSON::Array>();
+  JSON::Array unknownFrameworksArray =
+    state.values["unregistered_frameworks"].as<JSON::Array>();
+
+  EXPECT_EQ(1u, frameworks.values.size());
+  EXPECT_EQ(1u, activeTasks.values.size());
+  EXPECT_EQ(0u, orphanTasks.values.size());
+  EXPECT_EQ(0u, unknownFrameworksArray.values.size());
+  EXPECT_EQ(frameworkId.value(), activeFrameworkId.value);
+
+  EXPECT_CALL(sched, disconnected(&driver))
+    .Times(1);
+
+  // Stop the master.
+  Stop(master.get());
+
+  Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+    FUTURE_PROTOBUF(SlaveReregisteredMessage(), master.get(), _);
+
+  // Drop the reregisterFrameworkMessage to delay the framework
+  // from re-registration.
+  Future<ReregisterFrameworkMessage> reregisterFrameworkMessage =
+    DROP_PROTOBUF(ReregisterFrameworkMessage(), _, master.get());
+
+  Future<FrameworkRegisteredMessage> frameworkRegisteredMessage =
+    FUTURE_PROTOBUF(FrameworkRegisteredMessage(), master.get(), _);
+
+  Clock::pause();
+
+  // The master failover.
+  master = StartMaster();
+  ASSERT_SOME(master);
+
+  // Simulate a new master detected event to the slave and the framework.
+  detector.appoint(master.get());
+
+  AWAIT_READY(slaveReregisteredMessage);
+  AWAIT_READY(reregisterFrameworkMessage);
+
+  // Get the master's state.
+  response = process::http::get(master.get(), "state.json");
+  AWAIT_READY(response);
+
+  EXPECT_SOME_EQ(
+      "application/json",
+      response.get().headers.get("Content-Type"));
+
+  parse = JSON::parse<JSON::Object>(response.get().body);
+  ASSERT_SOME(parse);
+
+  // Verify that we have some orphan tasks and unregistered
+  // frameworks.
+  state = parse.get();
+  orphanTasks = state.values["orphan_tasks"].as<JSON::Array>();
+  EXPECT_EQ(activeTasks, orphanTasks);
+
+  unknownFrameworksArray =
+    state.values["unregistered_frameworks"].as<JSON::Array>();
+  EXPECT_EQ(1u, unknownFrameworksArray.values.size());
+
+  JSON::String unknownFrameworkId =
+    unknownFrameworksArray.values.front().as<JSON::String>();
+  EXPECT_EQ(activeFrameworkId, unknownFrameworkId);
+
+  // Advance the clock to let the framework re-register with the master.
+  Clock::advance(Seconds(1));
+  Clock::settle();
+  Clock::resume();
+
+  AWAIT_READY(frameworkRegisteredMessage);
+
+  // Get the master's state.
+  response = process::http::get(master.get(), "state.json");
+  AWAIT_READY(response);
+
+  EXPECT_SOME_EQ(
+      "application/json",
+      response.get().headers.get("Content-Type"));
+
+  parse = JSON::parse<JSON::Object>(response.get().body);
+  ASSERT_SOME(parse);
+
+  // Verify the orphan tasks and unregistered frameworks are removed.
+  state = parse.get();
+  unknownFrameworksArray =
+    state.values["unregistered_frameworks"].as<JSON::Array>();
+  EXPECT_EQ(0u, unknownFrameworksArray.values.size());
+
+  orphanTasks = state.values["orphan_tasks"].as<JSON::Array>();
+  EXPECT_EQ(0u, orphanTasks.values.size());
+
+  // Cleanup.
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}