You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2014/06/07 03:05:14 UTC

[3/3] git commit: Added "implicit" reconciliation.

Added "implicit" reconciliation.

Review: https://reviews.apache.org/r/22268


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/5172630a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/5172630a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/5172630a

Branch: refs/heads/master
Commit: 5172630ae73b7c5f21e1d0e1840a3dc676816582
Parents: 87de508
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Thu Jun 5 09:49:40 2014 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Fri Jun 6 17:51:03 2014 -0700

----------------------------------------------------------------------
 include/mesos/scheduler.hpp                     |   9 +-
 include/mesos/scheduler/scheduler.proto         |  11 +-
 .../src/org/apache/mesos/SchedulerDriver.java   |  12 +-
 src/master/master.cpp                           |  38 ++++-
 src/messages/messages.proto                     |   8 +-
 src/python/src/mesos.py                         |   1 +
 src/tests/reconciliation_tests.cpp              | 148 ++++++++++++++++++-
 7 files changed, 212 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/5172630a/include/mesos/scheduler.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/scheduler.hpp b/include/mesos/scheduler.hpp
index f8424e3..d224945 100644
--- a/include/mesos/scheduler.hpp
+++ b/include/mesos/scheduler.hpp
@@ -308,9 +308,12 @@ public:
                                       const std::string& data) = 0;
 
   /**
-   * Causes the master to send back the latest task status for each
-   * task in 'statuses', if possible. Tasks that are no longer known
-   * will result in a TASK_LOST update.
+   * Allows the framework to query the status for non-terminal tasks.
+   * This causes the master to send back the latest task status for
+   * each task in 'statuses', if possible. Tasks that are no longer
+   * known will result in a TASK_LOST update. If statuses is empty,
+   * then the master will send the latest status for each task
+   * currently known.
    */
   virtual Status reconcileTasks(
       const std::vector<TaskStatus>& statuses) = 0;

http://git-wip-us.apache.org/repos/asf/mesos/blob/5172630a/include/mesos/scheduler/scheduler.proto
----------------------------------------------------------------------
diff --git a/include/mesos/scheduler/scheduler.proto b/include/mesos/scheduler/scheduler.proto
index 4deda55..6ab5089 100644
--- a/include/mesos/scheduler/scheduler.proto
+++ b/include/mesos/scheduler/scheduler.proto
@@ -160,9 +160,16 @@ message Call {
     required bytes uuid = 3;
   }
 
+  // Allows the framework to query the status for non-terminal tasks.
+  // This causes the master to send back the latest task status for
+  // each task in 'statuses', if possible. Tasks that are no longer
+  // known will result in a TASK_LOST update. If statuses is empty,
+  // then the master will send the latest status for each task
+  // currently known.
+  // TODO(bmahler): Add a guiding document for reconciliation or
+  // document reconciliation in-depth here.
   message Reconcile {
-    // TODO(benh): Send TaskID's instead, see MESOS-1453.
-    repeated TaskStatus statuses = 1;
+    repeated TaskStatus statuses = 1; // Should be non-terminal only.
   }
 
   message Message {

http://git-wip-us.apache.org/repos/asf/mesos/blob/5172630a/src/java/src/org/apache/mesos/SchedulerDriver.java
----------------------------------------------------------------------
diff --git a/src/java/src/org/apache/mesos/SchedulerDriver.java b/src/java/src/org/apache/mesos/SchedulerDriver.java
index ae2d915..591984e 100644
--- a/src/java/src/org/apache/mesos/SchedulerDriver.java
+++ b/src/java/src/org/apache/mesos/SchedulerDriver.java
@@ -212,10 +212,14 @@ public interface SchedulerDriver {
                               byte[] data);
 
   /**
-   * Reconciliation of tasks causes the master to send status updates for tasks
-   * whose status differs from the status sent here.
-   *
-   * @param statuses The collection of tasks and statuses to reconcile.
+   * Allows the framework to query the status for non-terminal tasks.
+   * This causes the master to send back the latest task status for
+   * each task in 'statuses', if possible. Tasks that are no longer
+   * known will result in a TASK_LOST update. If statuses is empty,
+   * then the master will send the latest status for each task
+   * currently known.
+   *
+   * @param statuses The collection of non-terminal TaskStatuses to reconcile.
    * @return The state of the driver after the call.
    */
   Status reconcileTasks(Collection<TaskStatus> statuses);

http://git-wip-us.apache.org/repos/asf/mesos/blob/5172630a/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 3b13773..c18ccc4 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -2735,10 +2735,41 @@ void Master::reconcileTasks(
     return;
   }
 
-  LOG(INFO) << "Performing task state reconciliation for " << statuses.size()
-            << " task statuses of framework " << frameworkId;
+  if (from != framework->pid) {
+    LOG(WARNING)
+      << "Ignoring reconcile tasks message for framework " << frameworkId
+      << " from '" << from << "' because it is not from the registered"
+      << " framework '" << framework->pid << "'";
+    return;
+  }
+
+  if (statuses.empty()) {
+    // Implicit reconciliation.
+    LOG(INFO) << "Performing implicit task state reconciliation for framework "
+              << frameworkId;
+
+    // TODO(bmahler): Consider sending completed tasks?
+    foreachvalue (Task* task, framework->tasks) {
+      StatusUpdate update = protobuf::createStatusUpdate(
+          frameworkId,
+          task->slave_id(),
+          task->task_id(),
+          task->state(),
+          "Reconciliation: Latest task state");
+
+      StatusUpdateMessage message;
+      message.mutable_update()->CopyFrom(update);
+      send(framework->pid, message);
+    }
+
+    return;
+  }
+
+  // Explicit reconciliation.
+  LOG(INFO) << "Performing explicit task state reconciliation for "
+            << statuses.size() << " task statuses of framework " << frameworkId;
 
-  // Reconciliation occurs for the following cases:
+  // Explicit reconciliation occurs for the following cases:
   //   (1) If the slave is unknown, we send TASK_LOST.
   //   (2) If the task is missing on the slave, we send TASK_LOST.
   //   (3) Otherwise, we send the latest state.
@@ -2796,7 +2827,6 @@ void Master::reconcileTasks(
     }
 
     if (update.isSome()) {
-      CHECK_NOTNULL(framework);
       StatusUpdateMessage message;
       message.mutable_update()->CopyFrom(update.get());
       send(framework->pid, message);

http://git-wip-us.apache.org/repos/asf/mesos/blob/5172630a/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index 321e250..8aecc8b 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -208,9 +208,15 @@ message LostSlaveMessage {
 }
 
 
+// Allows the framework to query the status for non-terminal tasks.
+// This causes the master to send back the latest task status for
+// each task in 'statuses', if possible. Tasks that are no longer
+// known will result in a TASK_LOST update. If statuses is empty,
+// then the master will send the latest status for each task
+// currently known.
 message ReconcileTasksMessage {
   required FrameworkID framework_id = 1;
-  repeated TaskStatus statuses = 2;
+  repeated TaskStatus statuses = 2; // Should be non-terminal only.
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/5172630a/src/python/src/mesos.py
----------------------------------------------------------------------
diff --git a/src/python/src/mesos.py b/src/python/src/mesos.py
index f44751e..a9764fb 100644
--- a/src/python/src/mesos.py
+++ b/src/python/src/mesos.py
@@ -234,6 +234,7 @@ class SchedulerDriver(object):
       retransmitted in any reliable fashion.
     """
 
+  # TODO(bmahler): Add reconcileTasks!
 
 class Executor(object):
   """

http://git-wip-us.apache.org/repos/asf/mesos/blob/5172630a/src/tests/reconciliation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/reconciliation_tests.cpp b/src/tests/reconciliation_tests.cpp
index 3574388..6edbf75 100644
--- a/src/tests/reconciliation_tests.cpp
+++ b/src/tests/reconciliation_tests.cpp
@@ -195,7 +195,7 @@ TEST_F(ReconciliationTest, TaskStateMatch)
   const TaskID taskId = update.get().task_id();
   const SlaveID slaveId = update.get().slave_id();
 
-  // Framework should not receive a status udpate.
+  // Framework should not receive a status update.
   EXPECT_CALL(sched, statusUpdate(&driver, _))
     .Times(0);
 
@@ -485,3 +485,149 @@ TEST_F(ReconciliationTest, SlaveInTransition)
 
   Shutdown();
 }
+
+
+// This test ensures that an implicit reconciliation request results
+// in updates for all non-terminal tasks known to the master.
+TEST_F(ReconciliationTest, ImplicitNonTerminalTask)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+  TestContainerizer containerizer(&exec);
+
+  Try<PID<Slave> > slave = StartSlave(&containerizer);
+  ASSERT_SOME(slave);
+
+  // Launch a framework and get a task running.
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+    &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 1, 512, "*"))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  EXPECT_CALL(exec, registered(_, _, _, _));
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  Future<TaskStatus> update;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&update));
+
+  driver.start();
+
+  // Wait until the framework is registered.
+  AWAIT_READY(frameworkId);
+
+  AWAIT_READY(update);
+  EXPECT_EQ(TASK_RUNNING, update.get().state());
+  EXPECT_TRUE(update.get().has_slave_id());
+
+  // When making an implicit reconciliation request, the non-terminal
+  // task should be sent back.
+  Future<TaskStatus> update2;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&update2));
+
+  vector<TaskStatus> statuses;
+  driver.reconcileTasks(statuses);
+
+  AWAIT_READY(update2);
+  EXPECT_EQ(TASK_RUNNING, update2.get().state());
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  driver.stop();
+  driver.join();
+
+  Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
+}
+
+
+// This test ensures that the master does not send updates for
+// terminal tasks during an implicit reconciliation request.
+// TODO(bmahler): Soon the master will keep non-acknowledged
+// tasks, and this test may break.
+TEST_F(ReconciliationTest, ImplicitTerminalTask)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+  TestContainerizer containerizer(&exec);
+
+  Try<PID<Slave> > slave = StartSlave(&containerizer);
+  ASSERT_SOME(slave);
+
+  // Launch a framework and get a task terminal.
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+    &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 1, 512, "*"))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  EXPECT_CALL(exec, registered(_, _, _, _));
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_FINISHED));
+
+  Future<TaskStatus> update;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&update));
+
+  driver.start();
+
+  // Wait until the framework is registered.
+  AWAIT_READY(frameworkId);
+
+  AWAIT_READY(update);
+  EXPECT_EQ(TASK_FINISHED, update.get().state());
+  EXPECT_TRUE(update.get().has_slave_id());
+
+  // Framework should not receive any further updates.
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .Times(0);
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  Future<ReconcileTasksMessage> reconcileTasksMessage =
+    FUTURE_PROTOBUF(ReconcileTasksMessage(), _ , _);
+
+  Clock::pause();
+
+  // When making an implicit reconciliation request, the master
+  // should not send back terminal tasks.
+  vector<TaskStatus> statuses;
+  driver.reconcileTasks(statuses);
+
+  // Make sure the master received the reconcile tasks message.
+  AWAIT_READY(reconcileTasksMessage);
+
+  // The Clock::settle() will ensure that framework would receive
+  // a status update if it is sent by the master. In this test it
+  // shouldn't receive any.
+  Clock::settle();
+
+  driver.stop();
+  driver.join();
+
+  Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
+}