You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/02/28 23:31:58 UTC

git commit: Added completed frameworks/tasks to slave re-registration.

Repository: mesos
Updated Branches:
  refs/heads/master 7bc952510 -> bcd4dc19e


Added completed frameworks/tasks to slave re-registration.

Fixes MESOS-767.

Review: https://reviews.apache.org/r/16724


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/bcd4dc19
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/bcd4dc19
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/bcd4dc19

Branch: refs/heads/master
Commit: bcd4dc19e10c4b54b90fae713d14477b849147c2
Parents: 7bc9525
Author: Adam B <ad...@mesosphere.io>
Authored: Fri Feb 28 14:31:36 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Fri Feb 28 14:31:36 2014 -0800

----------------------------------------------------------------------
 include/mesos/scheduler.hpp         |   2 +-
 src/master/master.cpp               |  50 ++++++++-
 src/master/master.hpp               |  21 +++-
 src/messages/messages.proto         |  15 +++
 src/slave/slave.cpp                 |  35 ++++++-
 src/tests/fault_tolerance_tests.cpp | 170 ++++++++++++++++++++++++++++++-
 src/tests/mesos.hpp                 |  28 +++--
 7 files changed, 298 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/bcd4dc19/include/mesos/scheduler.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/scheduler.hpp b/include/mesos/scheduler.hpp
index 55db177..85db111 100644
--- a/include/mesos/scheduler.hpp
+++ b/include/mesos/scheduler.hpp
@@ -410,7 +410,7 @@ private:
   // URL for the master (e.g., zk://, file://, etc).
   std::string url;
 
-  // Mutex to enforce all non-callbacks are execute serially.
+  // Mutex to enforce all non-callbacks are executed serially.
   pthread_mutex_t mutex;
 
   // Condition variable for waiting until driver terminates.

http://git-wip-us.apache.org/repos/asf/mesos/blob/bcd4dc19/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 2e86a19..f7ba9aa 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -443,7 +443,8 @@ void Master::initialize()
       &ReregisterSlaveMessage::slave_id,
       &ReregisterSlaveMessage::slave,
       &ReregisterSlaveMessage::executor_infos,
-      &ReregisterSlaveMessage::tasks);
+      &ReregisterSlaveMessage::tasks,
+      &ReregisterSlaveMessage::completed_frameworks);
 
   install<UnregisterSlaveMessage>(
       &Master::unregisterSlave,
@@ -1003,7 +1004,7 @@ void Master::reregisterFramework(
       }
     }
 
-    // N.B. Need to add the framwwork _after_ we add it's tasks
+    // N.B. Need to add the framework _after_ we add its tasks
     // (above) so that we can properly determine the resources it's
     // currently using!
     addFramework(framework);
@@ -1866,7 +1867,8 @@ void Master::reregisterSlave(
     const SlaveID& slaveId,
     const SlaveInfo& slaveInfo,
     const vector<ExecutorInfo>& executorInfos,
-    const vector<Task>& tasks)
+    const vector<Task>& tasks,
+    const vector<Archive::Framework>& completedFrameworks_)
 {
   if (!elected()) {
     LOG(WARNING) << "Ignoring re-register slave message from "
@@ -1951,7 +1953,7 @@ void Master::reregisterSlave(
     LOG(INFO) << "Attempting to re-register slave " << slave->id << " at "
         << slave->pid << " (" << slave->info.hostname() << ")";
 
-    readdSlave(slave, executorInfos, tasks);
+    readdSlave(slave, executorInfos, tasks, completedFrameworks_);
   }
 
   // Send the latest framework pids to the slave.
@@ -2876,7 +2878,8 @@ void Master::addSlave(Slave* slave, bool reregister)
 
 void Master::readdSlave(Slave* slave,
     const vector<ExecutorInfo>& executorInfos,
-    const vector<Task>& tasks)
+    const vector<Task>& tasks,
+    const vector<Archive::Framework>& completedFrameworks_)
 {
   CHECK_NOTNULL(slave);
 
@@ -2941,10 +2944,47 @@ void Master::readdSlave(Slave* slave,
     resources[task.framework_id()] += task.resources();
   }
 
+  foreach (const Archive::Framework& completedFramework_,
+           completedFrameworks_) {
+    LOG(INFO) << "Re-add completed framework "
+              << completedFramework_.framework_info().id()
+              << " from slave " << slave->id << " ("
+              << slave->info.hostname() << ")";
+    readdCompletedFramework(completedFramework_);
+  }
+
   allocator->slaveAdded(slave->id, slave->info, resources);
 }
 
 
+void Master::readdCompletedFramework(
+    const Archive::Framework& completedFramework_)
+{
+  const FrameworkInfo& frameworkInfo = completedFramework_.framework_info();
+  Option<shared_ptr<Framework> > framework = None();
+  foreach (const shared_ptr<Framework>& completedFramework,
+           completedFrameworks) {
+    if (completedFramework->id == frameworkInfo.id()) {
+      framework = completedFramework;
+      break;
+    }
+  }
+
+  if (framework.isNone()) {
+    UPID pid = completedFramework_.pid();
+    framework = shared_ptr<Framework>(
+        new Framework(frameworkInfo, frameworkInfo.id(), pid));
+    VLOG(1) << "Re-adding completed framework " << framework.get()->id;
+    completedFrameworks.push_back(framework.get());
+  }
+
+  foreach (const Task& task, completedFramework_.tasks()) {
+    VLOG(2) << "Re-adding completed task " << task.task_id();
+    framework.get()->addCompletedTask(task);
+  }
+}
+
+
 // Lose all of a slave's tasks and delete the slave object.
 void Master::removeSlave(Slave* slave)
 {

http://git-wip-us.apache.org/repos/asf/mesos/blob/bcd4dc19/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 72525d2..49a3e15 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -143,7 +143,9 @@ public:
       const SlaveID& slaveId,
       const SlaveInfo& slaveInfo,
       const std::vector<ExecutorInfo>& executorInfos,
-      const std::vector<Task>& tasks);
+      const std::vector<Task>& tasks,
+      const std::vector<Archive::Framework>& completedFrameworks);
+
   void unregisterSlave(
       const SlaveID& slaveId);
   void statusUpdate(
@@ -243,8 +245,11 @@ protected:
   void addSlave(Slave* slave, bool reregister = false);
 
   void readdSlave(Slave* slave,
-		  const std::vector<ExecutorInfo>& executorInfos,
-		  const std::vector<Task>& tasks);
+      const std::vector<ExecutorInfo>& executorInfos,
+      const std::vector<Task>& tasks,
+      const std::vector<Archive::Framework>& completedFrameworks);
+
+  void readdCompletedFramework(const Archive::Framework& completedFramework);
 
   // Lose all of a slave's tasks and delete the slave object
   void removeSlave(Slave* slave);
@@ -465,7 +470,7 @@ struct Slave
   }
 
   bool hasExecutor(const FrameworkID& frameworkId,
-		   const ExecutorID& executorId) const
+                   const ExecutorID& executorId) const
   {
     return executors.contains(frameworkId) &&
       executors.get(frameworkId).get().contains(executorId);
@@ -543,7 +548,7 @@ struct Framework
   Framework(const FrameworkInfo& _info,
             const FrameworkID& _id,
             const process::UPID& _pid,
-            const process::Time& time)
+            const process::Time& time = process::Clock::now())
     : id(_id),
       info(_info),
       pid(_pid),
@@ -584,6 +589,12 @@ struct Framework
     resources -= task->resources();
   }
 
+  void addCompletedTask(const Task& task)
+  {
+    // TODO(adam-mesos): Check if completed task already exists.
+    completedTasks.push_back(memory::shared_ptr<Task>(new Task(task)));
+  }
+
   void addOffer(Offer* offer)
   {
     CHECK(!offers.contains(offer)) << "Duplicate offer " << offer->id();

http://git-wip-us.apache.org/repos/asf/mesos/blob/bcd4dc19/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index 922a8c4..c26a3d0 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -228,6 +228,7 @@ message ReregisterSlaveMessage {
   required SlaveInfo slave = 2;
   repeated ExecutorInfo executor_infos = 4;
   repeated Task tasks = 3;
+  repeated Archive.Framework completed_frameworks = 5;
 }
 
 
@@ -364,3 +365,17 @@ message AuthenticationErrorMessage {
   optional string error = 1;
 }
 
+
+// TODO(adam-mesos): Move this to an 'archive' package.
+/**
+ * Describes Completed Frameworks, etc. for archival.
+ */
+message Archive {
+  message Framework {
+    required FrameworkInfo framework_info = 1;
+    optional string pid = 2;
+    repeated Task tasks = 3;
+  }
+  repeated Framework frameworks = 1;
+}
+

http://git-wip-us.apache.org/repos/asf/mesos/blob/bcd4dc19/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 4f5349b..b350df4 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -554,7 +554,7 @@ void Slave::doReliableRegistration()
     message.mutable_slave()->CopyFrom(info);
     message.mutable_slave()->mutable_id()->CopyFrom(info.id());
 
-    foreachvalue (Framework* framework, frameworks){
+    foreachvalue (Framework* framework, frameworks) {
       foreachvalue (Executor* executor, framework->executors) {
         // Ignore terminated executors because they do not consume
         // any resources.
@@ -596,6 +596,39 @@ void Slave::doReliableRegistration()
       }
     }
 
+    // Add completed frameworks.
+    foreach (const Owned<Framework>& completedFramework, completedFrameworks) {
+      VLOG(1) << "Reregistering completed framework "
+                << completedFramework->id;
+      Archive::Framework* completedFramework_ =
+        message.add_completed_frameworks();
+      FrameworkInfo* frameworkInfo =
+        completedFramework_->mutable_framework_info();
+      frameworkInfo->CopyFrom(completedFramework->info);
+
+      // TODO(adam-mesos): Needed because FrameworkInfo doesn't have the id.
+      frameworkInfo->mutable_id()->CopyFrom(completedFramework->id);
+
+      completedFramework_->set_pid(completedFramework->pid);
+
+      foreach (const Owned<Executor>& executor,
+               completedFramework->completedExecutors) {
+        VLOG(2) << "Reregistering completed executor " << executor->id
+                << " with " << executor->terminatedTasks.size()
+                << " terminated tasks, " << executor->completedTasks.size()
+                << " completed tasks";
+        foreach (const Task* task, executor->terminatedTasks.values()) {
+          VLOG(2) << "Reregistering terminated task " << task->task_id();
+          completedFramework_->add_tasks()->CopyFrom(*task);
+        }
+        foreach (const memory::shared_ptr<Task>& task,
+                 executor->completedTasks) {
+          VLOG(2) << "Reregistering completed task " << task->task_id();
+          completedFramework_->add_tasks()->CopyFrom(*task);
+        }
+      }
+    }
+
     CHECK_SOME(master);
     send(master.get(), message);
   }

http://git-wip-us.apache.org/repos/asf/mesos/blob/bcd4dc19/src/tests/fault_tolerance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fault_tolerance_tests.cpp b/src/tests/fault_tolerance_tests.cpp
index 59632b0..3f4796a 100644
--- a/src/tests/fault_tolerance_tests.cpp
+++ b/src/tests/fault_tolerance_tests.cpp
@@ -31,6 +31,7 @@
 
 #include <process/future.hpp>
 #include <process/gmock.hpp>
+#include <process/http.hpp>
 #include <process/owned.hpp>
 #include <process/pid.hpp>
 #include <process/process.hpp>
@@ -63,6 +64,8 @@ using process::Message;
 using process::Owned;
 using process::PID;
 using process::UPID;
+using process::http::OK;
+using process::http::Response;
 
 using std::string;
 using std::map;
@@ -655,6 +658,169 @@ TEST_F(FaultToleranceTest, MasterFailover)
 }
 
 
+// TODO(adam-mesos): Use real JSON parser (see stout/json.hpp TODOs).
+bool isJsonValueEmpty(const string& text, const string& key)
+{
+  string sub = text.substr(text.find(key));
+  size_t index = sub.find(":") + 1;
+  // This will not retrieve the entire value if the value is
+  // an object/array since it will stop at the first comma,
+  // but this is good enough to figure out if the value is empty.
+  return (sub.substr(index, sub.find(",") - index) == "[]");
+}
+
+
+// This test ensures that a recovering master recovers completed frameworks
+// and tasks from a slave's re-registration.
+TEST_F(FaultToleranceTest, ReregisterCompletedFrameworks)
+{
+  // Step 1. Start Master and Slave.
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor executor(DEFAULT_EXECUTOR_ID);
+  StandaloneMasterDetector* detector =
+    new StandaloneMasterDetector(master.get());
+
+  Try<PID<Slave> > slave =
+    StartSlave(&executor, Owned<MasterDetector>(detector));
+  ASSERT_SOME(slave);
+
+  // Verify master/slave have 0 completed/running frameworks.
+  Future<Response> masterState = process::http::get(master.get(), "state.json");
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, masterState);
+  AWAIT_EXPECT_RESPONSE_HEADER_EQ(
+      "application/json",
+      "Content-Type",
+      masterState);
+  EXPECT_TRUE(isJsonValueEmpty(masterState.get().body, "completed_frameworks"));
+  EXPECT_TRUE(isJsonValueEmpty(masterState.get().body, "\"frameworks\""));
+
+  // Step 2. Create/start framework.
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return());        // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(frameworkId);
+  EXPECT_NE("", frameworkId.get().value());
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  // Step 3. Create/launch a task.
+  TaskInfo task = createTask(offers.get()[0], "exit 1", DEFAULT_EXECUTOR_ID);
+  vector<TaskInfo> tasks;
+  tasks.push_back(task); // Short-lived task.
+
+  EXPECT_CALL(executor, registered(_, _, _, _));
+  EXPECT_CALL(executor, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  Future<TaskStatus> statusRunning;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&statusRunning));
+
+  driver.launchTasks(offers.get()[0].id(), tasks);
+
+  AWAIT_READY(statusRunning);
+  EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
+
+  // Verify master and slave recognize the running task/framework.
+  masterState = process::http::get(master.get(), "state.json");
+  EXPECT_TRUE(isJsonValueEmpty(masterState.get().body, "completed_frameworks"));
+  EXPECT_FALSE(isJsonValueEmpty(masterState.get().body, "\"frameworks\""));
+
+  Future<Response> slaveState = process::http::get(slave.get(), "state.json");
+  EXPECT_TRUE(isJsonValueEmpty(slaveState.get().body, "completed_frameworks"));
+  EXPECT_FALSE(isJsonValueEmpty(slaveState.get().body, "\"frameworks\""));
+
+  // Step 4. Kill task.
+  EXPECT_CALL(executor, killTask(_, _))
+    .WillOnce(SendStatusUpdateFromTaskID(TASK_KILLED));
+
+  Future<TaskStatus> statusKilled;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&statusKilled));
+
+  driver.killTask(task.task_id());
+
+  AWAIT_READY(statusKilled);
+  ASSERT_EQ(TASK_KILLED, statusKilled.get().state());
+
+  masterState = process::http::get(master.get(), "state.json");
+  EXPECT_TRUE(isJsonValueEmpty(masterState.get().body, "completed_frameworks"));
+  EXPECT_FALSE(isJsonValueEmpty(masterState.get().body, "\"frameworks\""));
+
+  slaveState = process::http::get(slave.get(), "state.json");
+  EXPECT_TRUE(isJsonValueEmpty(slaveState.get().body, "completed_frameworks"));
+  EXPECT_FALSE(isJsonValueEmpty(slaveState.get().body, "\"frameworks\""));
+
+  // Step 5. Stop the framework, shutdown executor.
+  Future<Nothing> shutdown;
+  EXPECT_CALL(executor, shutdown(_))
+    .WillOnce(FutureSatisfy(&shutdown));
+  Future<Nothing> executorTerminated =
+    FUTURE_DISPATCH(_, &Slave::executorTerminated);
+
+  driver.stop();
+  driver.join();
+
+  AWAIT_READY(executorTerminated);
+  AWAIT_READY(shutdown);
+
+  // Verify master sees completed framework.
+  masterState = process::http::get(master.get(), "state.json");
+  EXPECT_FALSE(isJsonValueEmpty(masterState.get().body, "completed_frameworks"));
+  EXPECT_TRUE(isJsonValueEmpty(masterState.get().body, "\"frameworks\""));
+
+  // Slave received message to shutdown the framework,
+  // need to wait for it to actually happen.
+  Clock::pause();
+  Clock::settle();
+  Clock::resume();
+
+  // Verify slave sees completed framework.
+  slaveState = process::http::get(slave.get(), "state.json");
+  EXPECT_FALSE(isJsonValueEmpty(slaveState.get().body, "completed_frameworks"));
+  EXPECT_TRUE(isJsonValueEmpty(slaveState.get().body, "\"frameworks\""));
+
+  // Step 6. Simulate failed over master by restarting the master.
+  Stop(master.get());
+  master = StartMaster();
+  ASSERT_SOME(master);
+
+  // Verify new master knows of no running/completed frameworks.
+  masterState = process::http::get(master.get(), "state.json");
+  EXPECT_TRUE(isJsonValueEmpty(masterState.get().body, "completed_frameworks"));
+  EXPECT_TRUE(isJsonValueEmpty(masterState.get().body, "\"frameworks\""));
+
+  Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+    FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+  // Simulate a new master detected message to the slave.
+  detector->appoint(master.get());
+
+  AWAIT_READY(slaveReregisteredMessage);
+
+  // Verify completed framework/task in new Master.
+  masterState = process::http::get(master.get(), "state.json");
+  EXPECT_FALSE(isJsonValueEmpty(masterState.get().body, "completed_frameworks"));
+  EXPECT_TRUE(isJsonValueEmpty(masterState.get().body, "\"frameworks\""));
+
+  Shutdown();
+}
+
+
 TEST_F(FaultToleranceTest, SchedulerFailover)
 {
   Try<PID<Master> > master = StartMaster();
@@ -1038,7 +1204,9 @@ TEST_F(FaultToleranceTest, SchedulerFailoverStatusUpdate)
   // Drop the first status update message
   // between master and the scheduler.
   Future<StatusUpdateMessage> statusUpdateMessage =
-    DROP_PROTOBUF(StatusUpdateMessage(), _, Not(AnyOf(Eq(master.get()), Eq(slave.get()))));
+    DROP_PROTOBUF(StatusUpdateMessage(),
+                  _,
+                  Not(AnyOf(Eq(master.get()), Eq(slave.get()))));
 
   driver1.launchTasks(offers.get()[0].id(), tasks);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/bcd4dc19/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 018d4ff..f77fbfe 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -224,22 +224,30 @@ protected:
         credential; })
 
 
-#define DEFAULT_EXECUTOR_ID						\
+#define DEFAULT_EXECUTOR_ID           \
       DEFAULT_EXECUTOR_INFO.executor_id()
 
 
 inline TaskInfo createTask(
     const Offer& offer,
     const std::string& command,
+    const Option<mesos::ExecutorID>& executorId = None(),
     const std::string& name = "test-task",
     const std::string& id = UUID::random().toString())
 {
   TaskInfo task;
   task.set_name(name);
   task.mutable_task_id()->set_value(id);
-  task.mutable_slave_id()->MergeFrom(offer.slave_id());
-  task.mutable_resources()->MergeFrom(offer.resources());
-  task.mutable_command()->set_value(command);
+  task.mutable_slave_id()->CopyFrom(offer.slave_id());
+  task.mutable_resources()->CopyFrom(offer.resources());
+  if (executorId.isSome()) {
+    ExecutorInfo executor;
+    executor.mutable_executor_id()->CopyFrom(executorId.get());
+    executor.mutable_command()->set_value(command);
+    task.mutable_executor()->CopyFrom(executor);
+  } else {
+    task.mutable_command()->set_value(command);
+  }
 
   return task;
 }
@@ -651,18 +659,18 @@ public:
     : cpus(_cpus), mem(_mem) {}
 
   virtual bool MatchAndExplain(const std::vector<Offer>& offers,
-			       ::testing::MatchResultListener* listener) const
+                               ::testing::MatchResultListener* listener) const
   {
     double totalCpus = 0;
     double totalMem = 0;
 
     foreach (const Offer& offer, offers) {
       foreach (const Resource& resource, offer.resources()) {
-	if (resource.name() == "cpus") {
-	  totalCpus += resource.scalar().value();
-	} else if (resource.name() == "mem") {
-	  totalMem += resource.scalar().value();
-	}
+        if (resource.name() == "cpus") {
+          totalCpus += resource.scalar().value();
+        } else if (resource.name() == "mem") {
+          totalMem += resource.scalar().value();
+        }
       }
     }