You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2015/04/25 23:33:33 UTC
[06/11] mesos git commit: Updated RECONCILE call to optionally
specifiy a slave id.
Updated RECONCILE call to optionally specifiy a slave id.
Review: https://reviews.apache.org/r/32502/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f95fa119
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f95fa119
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f95fa119
Branch: refs/heads/master
Commit: f95fa119044c9a11c8473ab088e948e7e1c1334d
Parents: 978e72d
Author: Vinod Kone <vi...@gmail.com>
Authored: Fri Mar 20 16:54:36 2015 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Sat Apr 25 11:32:47 2015 -1000
----------------------------------------------------------------------
include/mesos/scheduler/scheduler.proto | 20 +++--
src/master/master.cpp | 32 +++++++-
src/master/master.hpp | 4 +
src/scheduler/scheduler.cpp | 6 +-
src/tests/scheduler_tests.cpp | 107 +++++++++++++++++++++++++++
5 files changed, 156 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/f95fa119/include/mesos/scheduler/scheduler.proto
----------------------------------------------------------------------
diff --git a/include/mesos/scheduler/scheduler.proto b/include/mesos/scheduler/scheduler.proto
index 928995a..51bfe8d 100644
--- a/include/mesos/scheduler/scheduler.proto
+++ b/include/mesos/scheduler/scheduler.proto
@@ -176,16 +176,20 @@ message Call {
required bytes uuid = 3;
}
- // Allows the framework to query the status for non-terminal tasks.
+ // Allows the scheduler to query the status for non-terminal tasks.
// This causes the master to send back the latest task status for
- // each task in 'statuses', if possible. Tasks that are no longer
- // known will result in a TASK_LOST update. If statuses is empty,
- // then the master will send the latest status for each task
- // currently known.
- // TODO(bmahler): Add a guiding document for reconciliation or
- // document reconciliation in-depth here.
+ // each task in 'tasks', if possible. Tasks that are no longer known
+ // will result in a TASK_LOST update. If 'statuses' is empty, then
+ // the master will send the latest status for each task currently
+ // known.
message Reconcile {
- repeated TaskStatus statuses = 1; // Should be non-terminal only.
+ // TODO(vinod): Support arbitrary queries than just state of tasks.
+ message Task {
+ required TaskID task_id = 1;
+ optional SlaveID slave_id = 2;
+ }
+
+ repeated Task tasks = 1;
}
message Message {
http://git-wip-us.apache.org/repos/asf/mesos/blob/f95fa119/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index c9c2cc2..e762d56 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1603,9 +1603,16 @@ void Master::receive(
accept(framework, call.accept());
break;
+ case scheduler::Call::RECONCILE:
+ if (!call.has_reconcile()) {
+ drop(from, call, "Expecting 'reconcile' to be present");
+ return;
+ }
+ reconcile(framework, call.reconcile());
+ break;
+
case scheduler::Call::KILL:
case scheduler::Call::ACKNOWLEDGE:
- case scheduler::Call::RECONCILE:
case scheduler::Call::MESSAGE:
drop(from, call, "Unimplemented");
break;
@@ -3477,6 +3484,29 @@ void Master::shutdownSlave(const SlaveID& slaveId, const string& message)
}
+void Master::reconcile(
+ Framework* framework,
+ const scheduler::Call::Reconcile& reconcile)
+{
+ CHECK_NOTNULL(framework);
+
+ // Construct 'TaskStatus'es from 'Reconcile::Task's.
+ vector<TaskStatus> statuses;
+ foreach (const scheduler::Call::Reconcile::Task& task, reconcile.tasks()) {
+ TaskStatus status;
+ status.mutable_task_id()->CopyFrom(task.task_id());
+ status.set_state(TASK_RUNNING); // Dummy status.
+ if (task.has_slave_id()) {
+ status.mutable_slave_id()->CopyFrom(task.slave_id());
+ }
+
+ statuses.push_back(status);
+ }
+
+ _reconcileTasks(framework, statuses);
+}
+
+
void Master::reconcileTasks(
const UPID& from,
const FrameworkID& frameworkId,
http://git-wip-us.apache.org/repos/asf/mesos/blob/f95fa119/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 59d6015..ec17a60 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -453,6 +453,10 @@ private:
const scheduler::Call::Accept& accept,
const process::Future<std::list<process::Future<bool>>>& authorizations);
+ void reconcile(
+ Framework* framework,
+ const scheduler::Call::Reconcile& reconcile);
+
bool elected() const
{
return leader.isSome() && leader.get() == info_;
http://git-wip-us.apache.org/repos/asf/mesos/blob/f95fa119/src/scheduler/scheduler.cpp
----------------------------------------------------------------------
diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp
index e80a0dc..d417442 100644
--- a/src/scheduler/scheduler.cpp
+++ b/src/scheduler/scheduler.cpp
@@ -314,10 +314,8 @@ public:
drop(call, "Expecting 'reconcile' to be present");
return;
}
- ReconcileTasksMessage message;
- message.mutable_framework_id()->CopyFrom(call.framework_info().id());
- message.mutable_statuses()->CopyFrom(call.reconcile().statuses());
- send(master.get(), message);
+
+ send(master.get(), call);
break;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/f95fa119/src/tests/scheduler_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/scheduler_tests.cpp b/src/tests/scheduler_tests.cpp
index 4ea5528..4911920 100644
--- a/src/tests/scheduler_tests.cpp
+++ b/src/tests/scheduler_tests.cpp
@@ -211,6 +211,113 @@ TEST_F(SchedulerTest, TaskRunning)
}
+TEST_F(SchedulerTest, ReconcileTask)
+{
+ Try<PID<Master> > master = StartMaster();
+ ASSERT_SOME(master);
+
+ MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+ TestContainerizer containerizer(&exec);
+
+ Try<PID<Slave> > slave = StartSlave(&containerizer);
+ ASSERT_SOME(slave);
+
+ Callbacks callbacks;
+
+ Future<Nothing> connected;
+ EXPECT_CALL(callbacks, connected())
+ .WillOnce(FutureSatisfy(&connected));
+
+ scheduler::Mesos mesos(
+ master.get(),
+ DEFAULT_CREDENTIAL,
+ lambda::bind(&Callbacks::connected, lambda::ref(callbacks)),
+ lambda::bind(&Callbacks::disconnected, lambda::ref(callbacks)),
+ lambda::bind(&Callbacks::received, lambda::ref(callbacks), lambda::_1));
+
+ AWAIT_READY(connected);
+
+ Queue<Event> events;
+
+ EXPECT_CALL(callbacks, received(_))
+ .WillRepeatedly(Enqueue(&events));
+
+ {
+ Call call;
+ call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
+ call.set_type(Call::REGISTER);
+
+ mesos.send(call);
+ }
+
+ Future<Event> event = events.get();
+ AWAIT_READY(event);
+ EXPECT_EQ(Event::REGISTERED, event.get().type());
+
+ FrameworkID id(event.get().registered().framework_id());
+
+ event = events.get();
+ AWAIT_READY(event);
+ EXPECT_EQ(Event::OFFERS, event.get().type());
+ EXPECT_NE(0, event.get().offers().offers().size());
+
+ EXPECT_CALL(exec, registered(_, _, _, _))
+ .Times(1);
+
+ EXPECT_CALL(exec, launchTask(_, _))
+ .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+ Offer offer = event.get().offers().offers(0);
+ TaskInfo taskInfo = createTask(offer,"", DEFAULT_EXECUTOR_ID);
+
+ {
+ Call call;
+ call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
+ call.mutable_framework_info()->mutable_id()->CopyFrom(id);
+ call.set_type(Call::ACCEPT);
+
+ Call::Accept* accept = call.mutable_accept();
+ accept->add_offer_ids()->CopyFrom(offer.id());
+
+ Offer::Operation* operation = accept->add_operations();
+ operation->set_type(Offer::Operation::LAUNCH);
+ operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
+
+ mesos.send(call);
+ }
+
+ event = events.get();
+ AWAIT_READY(event);
+ EXPECT_EQ(Event::UPDATE, event.get().type());
+ EXPECT_EQ(TASK_RUNNING, event.get().update().status().state());
+
+ {
+ Call call;
+ call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
+ call.mutable_framework_info()->mutable_id()->CopyFrom(id);
+ call.set_type(Call::RECONCILE);
+
+ Call::Reconcile::Task* task = call.mutable_reconcile()->add_tasks();
+ task->mutable_task_id()->CopyFrom(taskInfo.task_id());
+
+ mesos.send(call);
+ }
+
+ event = events.get();
+ AWAIT_READY(event);
+ EXPECT_EQ(Event::UPDATE, event.get().type());
+ EXPECT_EQ(TASK_RUNNING, event.get().update().status().state());
+ EXPECT_EQ(TaskStatus::REASON_RECONCILIATION,
+ event.get().update().status().reason());
+
+ EXPECT_CALL(exec, shutdown(_))
+ .Times(AtMost(1));
+
+ Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
+}
+
+
// TODO(benh): Write test for sending Call::Acknowledgement through
// master to slave when Event::Update was generated locally.