You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2014/06/07 03:05:14 UTC
[3/3] git commit: Added "implicit" reconciliation.
Added "implicit" reconciliation.
Review: https://reviews.apache.org/r/22268
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/5172630a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/5172630a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/5172630a
Branch: refs/heads/master
Commit: 5172630ae73b7c5f21e1d0e1840a3dc676816582
Parents: 87de508
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Thu Jun 5 09:49:40 2014 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Fri Jun 6 17:51:03 2014 -0700
----------------------------------------------------------------------
include/mesos/scheduler.hpp | 9 +-
include/mesos/scheduler/scheduler.proto | 11 +-
.../src/org/apache/mesos/SchedulerDriver.java | 12 +-
src/master/master.cpp | 38 ++++-
src/messages/messages.proto | 8 +-
src/python/src/mesos.py | 1 +
src/tests/reconciliation_tests.cpp | 148 ++++++++++++++++++-
7 files changed, 212 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/5172630a/include/mesos/scheduler.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/scheduler.hpp b/include/mesos/scheduler.hpp
index f8424e3..d224945 100644
--- a/include/mesos/scheduler.hpp
+++ b/include/mesos/scheduler.hpp
@@ -308,9 +308,12 @@ public:
const std::string& data) = 0;
/**
- * Causes the master to send back the latest task status for each
- * task in 'statuses', if possible. Tasks that are no longer known
- * will result in a TASK_LOST update.
+ * Allows the framework to query the status for non-terminal tasks.
+ * This causes the master to send back the latest task status for
+ * each task in 'statuses', if possible. Tasks that are no longer
+ * known will result in a TASK_LOST update. If statuses is empty,
+ * then the master will send the latest status for each task
+ * currently known.
*/
virtual Status reconcileTasks(
const std::vector<TaskStatus>& statuses) = 0;
http://git-wip-us.apache.org/repos/asf/mesos/blob/5172630a/include/mesos/scheduler/scheduler.proto
----------------------------------------------------------------------
diff --git a/include/mesos/scheduler/scheduler.proto b/include/mesos/scheduler/scheduler.proto
index 4deda55..6ab5089 100644
--- a/include/mesos/scheduler/scheduler.proto
+++ b/include/mesos/scheduler/scheduler.proto
@@ -160,9 +160,16 @@ message Call {
required bytes uuid = 3;
}
+ // Allows the framework to query the status for non-terminal tasks.
+ // This causes the master to send back the latest task status for
+ // each task in 'statuses', if possible. Tasks that are no longer
+ // known will result in a TASK_LOST update. If statuses is empty,
+ // then the master will send the latest status for each task
+ // currently known.
+ // TODO(bmahler): Add a guiding document for reconciliation or
+ // document reconciliation in-depth here.
message Reconcile {
- // TODO(benh): Send TaskID's instead, see MESOS-1453.
- repeated TaskStatus statuses = 1;
+ repeated TaskStatus statuses = 1; // Should be non-terminal only.
}
message Message {
http://git-wip-us.apache.org/repos/asf/mesos/blob/5172630a/src/java/src/org/apache/mesos/SchedulerDriver.java
----------------------------------------------------------------------
diff --git a/src/java/src/org/apache/mesos/SchedulerDriver.java b/src/java/src/org/apache/mesos/SchedulerDriver.java
index ae2d915..591984e 100644
--- a/src/java/src/org/apache/mesos/SchedulerDriver.java
+++ b/src/java/src/org/apache/mesos/SchedulerDriver.java
@@ -212,10 +212,14 @@ public interface SchedulerDriver {
byte[] data);
/**
- * Reconciliation of tasks causes the master to send status updates for tasks
- * whose status differs from the status sent here.
- *
- * @param statuses The collection of tasks and statuses to reconcile.
+ * Allows the framework to query the status for non-terminal tasks.
+ * This causes the master to send back the latest task status for
+ * each task in 'statuses', if possible. Tasks that are no longer
+ * known will result in a TASK_LOST update. If statuses is empty,
+ * then the master will send the latest status for each task
+ * currently known.
+ *
+ * @param statuses The collection of non-terminal TaskStatuses to reconcile.
* @return The state of the driver after the call.
*/
Status reconcileTasks(Collection<TaskStatus> statuses);
http://git-wip-us.apache.org/repos/asf/mesos/blob/5172630a/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 3b13773..c18ccc4 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -2735,10 +2735,41 @@ void Master::reconcileTasks(
return;
}
- LOG(INFO) << "Performing task state reconciliation for " << statuses.size()
- << " task statuses of framework " << frameworkId;
+ if (from != framework->pid) {
+ LOG(WARNING)
+ << "Ignoring reconcile tasks message for framework " << frameworkId
+ << " from '" << from << "' because it is not from the registered"
+ << " framework '" << framework->pid << "'";
+ return;
+ }
+
+ if (statuses.empty()) {
+ // Implicit reconciliation.
+ LOG(INFO) << "Performing implicit task state reconciliation for framework "
+ << frameworkId;
+
+ // TODO(bmahler): Consider sending completed tasks?
+ foreachvalue (Task* task, framework->tasks) {
+ StatusUpdate update = protobuf::createStatusUpdate(
+ frameworkId,
+ task->slave_id(),
+ task->task_id(),
+ task->state(),
+ "Reconciliation: Latest task state");
+
+ StatusUpdateMessage message;
+ message.mutable_update()->CopyFrom(update);
+ send(framework->pid, message);
+ }
+
+ return;
+ }
+
+ // Explicit reconciliation.
+ LOG(INFO) << "Performing explicit task state reconciliation for "
+ << statuses.size() << " task statuses of framework " << frameworkId;
- // Reconciliation occurs for the following cases:
+ // Explicit reconciliation occurs for the following cases:
// (1) If the slave is unknown, we send TASK_LOST.
// (2) If the task is missing on the slave, we send TASK_LOST.
// (3) Otherwise, we send the latest state.
@@ -2796,7 +2827,6 @@ void Master::reconcileTasks(
}
if (update.isSome()) {
- CHECK_NOTNULL(framework);
StatusUpdateMessage message;
message.mutable_update()->CopyFrom(update.get());
send(framework->pid, message);
http://git-wip-us.apache.org/repos/asf/mesos/blob/5172630a/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index 321e250..8aecc8b 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -208,9 +208,15 @@ message LostSlaveMessage {
}
+// Allows the framework to query the status for non-terminal tasks.
+// This causes the master to send back the latest task status for
+// each task in 'statuses', if possible. Tasks that are no longer
+// known will result in a TASK_LOST update. If statuses is empty,
+// then the master will send the latest status for each task
+// currently known.
message ReconcileTasksMessage {
required FrameworkID framework_id = 1;
- repeated TaskStatus statuses = 2;
+ repeated TaskStatus statuses = 2; // Should be non-terminal only.
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/5172630a/src/python/src/mesos.py
----------------------------------------------------------------------
diff --git a/src/python/src/mesos.py b/src/python/src/mesos.py
index f44751e..a9764fb 100644
--- a/src/python/src/mesos.py
+++ b/src/python/src/mesos.py
@@ -234,6 +234,7 @@ class SchedulerDriver(object):
retransmitted in any reliable fashion.
"""
+ # TODO(bmahler): Add reconcileTasks!
class Executor(object):
"""
http://git-wip-us.apache.org/repos/asf/mesos/blob/5172630a/src/tests/reconciliation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/reconciliation_tests.cpp b/src/tests/reconciliation_tests.cpp
index 3574388..6edbf75 100644
--- a/src/tests/reconciliation_tests.cpp
+++ b/src/tests/reconciliation_tests.cpp
@@ -195,7 +195,7 @@ TEST_F(ReconciliationTest, TaskStateMatch)
const TaskID taskId = update.get().task_id();
const SlaveID slaveId = update.get().slave_id();
- // Framework should not receive a status udpate.
+ // Framework should not receive a status update.
EXPECT_CALL(sched, statusUpdate(&driver, _))
.Times(0);
@@ -485,3 +485,149 @@ TEST_F(ReconciliationTest, SlaveInTransition)
Shutdown();
}
+
+
+// This test ensures that an implicit reconciliation request results
+// in updates for all non-terminal tasks known to the master.
+TEST_F(ReconciliationTest, ImplicitNonTerminalTask)
+{
+ Try<PID<Master> > master = StartMaster();
+ ASSERT_SOME(master);
+
+ MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+ TestContainerizer containerizer(&exec);
+
+ Try<PID<Slave> > slave = StartSlave(&containerizer);
+ ASSERT_SOME(slave);
+
+ // Launch a framework and get a task running.
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ Future<FrameworkID> frameworkId;
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .WillOnce(FutureArg<1>(&frameworkId));
+
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 1, 512, "*"))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ EXPECT_CALL(exec, registered(_, _, _, _));
+
+ EXPECT_CALL(exec, launchTask(_, _))
+ .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+ Future<TaskStatus> update;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&update));
+
+ driver.start();
+
+ // Wait until the framework is registered.
+ AWAIT_READY(frameworkId);
+
+ AWAIT_READY(update);
+ EXPECT_EQ(TASK_RUNNING, update.get().state());
+ EXPECT_TRUE(update.get().has_slave_id());
+
+ // When making an implicit reconciliation request, the non-terminal
+ // task should be sent back.
+ Future<TaskStatus> update2;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&update2));
+
+ vector<TaskStatus> statuses;
+ driver.reconcileTasks(statuses);
+
+ AWAIT_READY(update2);
+ EXPECT_EQ(TASK_RUNNING, update2.get().state());
+
+ EXPECT_CALL(exec, shutdown(_))
+ .Times(AtMost(1));
+
+ driver.stop();
+ driver.join();
+
+ Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
+}
+
+
+// This test ensures that the master does not send updates for
+// terminal tasks during an implicit reconciliation request.
+// TODO(bmahler): Soon the master will keep non-acknowledged
+// tasks, and this test may break.
+TEST_F(ReconciliationTest, ImplicitTerminalTask)
+{
+ Try<PID<Master> > master = StartMaster();
+ ASSERT_SOME(master);
+
+ MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+ TestContainerizer containerizer(&exec);
+
+ Try<PID<Slave> > slave = StartSlave(&containerizer);
+ ASSERT_SOME(slave);
+
+ // Launch a framework and get a task terminal.
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ Future<FrameworkID> frameworkId;
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .WillOnce(FutureArg<1>(&frameworkId));
+
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 1, 512, "*"))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ EXPECT_CALL(exec, registered(_, _, _, _));
+
+ EXPECT_CALL(exec, launchTask(_, _))
+ .WillOnce(SendStatusUpdateFromTask(TASK_FINISHED));
+
+ Future<TaskStatus> update;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&update));
+
+ driver.start();
+
+ // Wait until the framework is registered.
+ AWAIT_READY(frameworkId);
+
+ AWAIT_READY(update);
+ EXPECT_EQ(TASK_FINISHED, update.get().state());
+ EXPECT_TRUE(update.get().has_slave_id());
+
+ // Framework should not receive any further updates.
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .Times(0);
+
+ EXPECT_CALL(exec, shutdown(_))
+ .Times(AtMost(1));
+
+ Future<ReconcileTasksMessage> reconcileTasksMessage =
+ FUTURE_PROTOBUF(ReconcileTasksMessage(), _ , _);
+
+ Clock::pause();
+
+ // When making an implicit reconciliation request, the master
+ // should not send back terminal tasks.
+ vector<TaskStatus> statuses;
+ driver.reconcileTasks(statuses);
+
+ // Make sure the master received the reconcile tasks message.
+ AWAIT_READY(reconcileTasksMessage);
+
+ // The Clock::settle() will ensure that framework would receive
+ // a status update if it is sent by the master. In this test it
+ // shouldn't receive any.
+ Clock::settle();
+
+ driver.stop();
+ driver.join();
+
+ Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
+}