You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2015/04/25 23:33:33 UTC

[06/11] mesos git commit: Updated RECONCILE call to optionally specifiy a slave id.

Updated RECONCILE call to optionally specifiy a slave id.

Review: https://reviews.apache.org/r/32502/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f95fa119
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f95fa119
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f95fa119

Branch: refs/heads/master
Commit: f95fa119044c9a11c8473ab088e948e7e1c1334d
Parents: 978e72d
Author: Vinod Kone <vi...@gmail.com>
Authored: Fri Mar 20 16:54:36 2015 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Sat Apr 25 11:32:47 2015 -1000

----------------------------------------------------------------------
 include/mesos/scheduler/scheduler.proto |  20 +++--
 src/master/master.cpp                   |  32 +++++++-
 src/master/master.hpp                   |   4 +
 src/scheduler/scheduler.cpp             |   6 +-
 src/tests/scheduler_tests.cpp           | 107 +++++++++++++++++++++++++++
 5 files changed, 156 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/f95fa119/include/mesos/scheduler/scheduler.proto
----------------------------------------------------------------------
diff --git a/include/mesos/scheduler/scheduler.proto b/include/mesos/scheduler/scheduler.proto
index 928995a..51bfe8d 100644
--- a/include/mesos/scheduler/scheduler.proto
+++ b/include/mesos/scheduler/scheduler.proto
@@ -176,16 +176,20 @@ message Call {
     required bytes uuid = 3;
   }
 
-  // Allows the framework to query the status for non-terminal tasks.
+  // Allows the scheduler to query the status for non-terminal tasks.
   // This causes the master to send back the latest task status for
-  // each task in 'statuses', if possible. Tasks that are no longer
-  // known will result in a TASK_LOST update. If statuses is empty,
-  // then the master will send the latest status for each task
-  // currently known.
-  // TODO(bmahler): Add a guiding document for reconciliation or
-  // document reconciliation in-depth here.
+  // each task in 'tasks', if possible. Tasks that are no longer known
+  // will result in a TASK_LOST update. If 'statuses' is empty, then
+  // the master will send the latest status for each task currently
+  // known.
   message Reconcile {
-    repeated TaskStatus statuses = 1; // Should be non-terminal only.
+   // TODO(vinod): Support arbitrary queries than just state of tasks.
+    message Task {
+      required TaskID task_id = 1;
+      optional SlaveID slave_id = 2;
+    }
+
+    repeated Task tasks = 1;
   }
 
   message Message {

http://git-wip-us.apache.org/repos/asf/mesos/blob/f95fa119/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index c9c2cc2..e762d56 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1603,9 +1603,16 @@ void Master::receive(
       accept(framework, call.accept());
       break;
 
+    case scheduler::Call::RECONCILE:
+      if (!call.has_reconcile()) {
+        drop(from, call, "Expecting 'reconcile' to be present");
+        return;
+      }
+      reconcile(framework, call.reconcile());
+      break;
+
     case scheduler::Call::KILL:
     case scheduler::Call::ACKNOWLEDGE:
-    case scheduler::Call::RECONCILE:
     case scheduler::Call::MESSAGE:
       drop(from, call, "Unimplemented");
       break;
@@ -3477,6 +3484,29 @@ void Master::shutdownSlave(const SlaveID& slaveId, const string& message)
 }
 
 
+void Master::reconcile(
+    Framework* framework,
+    const scheduler::Call::Reconcile& reconcile)
+{
+  CHECK_NOTNULL(framework);
+
+  // Construct 'TaskStatus'es from 'Reconcile::Task's.
+  vector<TaskStatus> statuses;
+  foreach (const scheduler::Call::Reconcile::Task& task, reconcile.tasks()) {
+    TaskStatus status;
+    status.mutable_task_id()->CopyFrom(task.task_id());
+    status.set_state(TASK_RUNNING); // Dummy status.
+    if (task.has_slave_id()) {
+      status.mutable_slave_id()->CopyFrom(task.slave_id());
+    }
+
+    statuses.push_back(status);
+  }
+
+  _reconcileTasks(framework, statuses);
+}
+
+
 void Master::reconcileTasks(
     const UPID& from,
     const FrameworkID& frameworkId,

http://git-wip-us.apache.org/repos/asf/mesos/blob/f95fa119/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 59d6015..ec17a60 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -453,6 +453,10 @@ private:
     const scheduler::Call::Accept& accept,
     const process::Future<std::list<process::Future<bool>>>& authorizations);
 
+  void reconcile(
+      Framework* framework,
+      const scheduler::Call::Reconcile& reconcile);
+
   bool elected() const
   {
     return leader.isSome() && leader.get() == info_;

http://git-wip-us.apache.org/repos/asf/mesos/blob/f95fa119/src/scheduler/scheduler.cpp
----------------------------------------------------------------------
diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp
index e80a0dc..d417442 100644
--- a/src/scheduler/scheduler.cpp
+++ b/src/scheduler/scheduler.cpp
@@ -314,10 +314,8 @@ public:
           drop(call, "Expecting 'reconcile' to be present");
           return;
         }
-        ReconcileTasksMessage message;
-        message.mutable_framework_id()->CopyFrom(call.framework_info().id());
-        message.mutable_statuses()->CopyFrom(call.reconcile().statuses());
-        send(master.get(), message);
+
+        send(master.get(), call);
         break;
       }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/f95fa119/src/tests/scheduler_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/scheduler_tests.cpp b/src/tests/scheduler_tests.cpp
index 4ea5528..4911920 100644
--- a/src/tests/scheduler_tests.cpp
+++ b/src/tests/scheduler_tests.cpp
@@ -211,6 +211,113 @@ TEST_F(SchedulerTest, TaskRunning)
 }
 
 
+TEST_F(SchedulerTest, ReconcileTask)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+  TestContainerizer containerizer(&exec);
+
+  Try<PID<Slave> > slave = StartSlave(&containerizer);
+  ASSERT_SOME(slave);
+
+  Callbacks callbacks;
+
+  Future<Nothing> connected;
+  EXPECT_CALL(callbacks, connected())
+    .WillOnce(FutureSatisfy(&connected));
+
+  scheduler::Mesos mesos(
+      master.get(),
+      DEFAULT_CREDENTIAL,
+      lambda::bind(&Callbacks::connected, lambda::ref(callbacks)),
+      lambda::bind(&Callbacks::disconnected, lambda::ref(callbacks)),
+      lambda::bind(&Callbacks::received, lambda::ref(callbacks), lambda::_1));
+
+  AWAIT_READY(connected);
+
+  Queue<Event> events;
+
+  EXPECT_CALL(callbacks, received(_))
+    .WillRepeatedly(Enqueue(&events));
+
+  {
+    Call call;
+    call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
+    call.set_type(Call::REGISTER);
+
+    mesos.send(call);
+  }
+
+  Future<Event> event = events.get();
+  AWAIT_READY(event);
+  EXPECT_EQ(Event::REGISTERED, event.get().type());
+
+  FrameworkID id(event.get().registered().framework_id());
+
+  event = events.get();
+  AWAIT_READY(event);
+  EXPECT_EQ(Event::OFFERS, event.get().type());
+  EXPECT_NE(0, event.get().offers().offers().size());
+
+  EXPECT_CALL(exec, registered(_, _, _, _))
+    .Times(1);
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  Offer offer = event.get().offers().offers(0);
+  TaskInfo taskInfo = createTask(offer,"", DEFAULT_EXECUTOR_ID);
+
+  {
+    Call call;
+    call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
+    call.mutable_framework_info()->mutable_id()->CopyFrom(id);
+    call.set_type(Call::ACCEPT);
+
+    Call::Accept* accept = call.mutable_accept();
+    accept->add_offer_ids()->CopyFrom(offer.id());
+
+    Offer::Operation* operation = accept->add_operations();
+    operation->set_type(Offer::Operation::LAUNCH);
+    operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
+
+    mesos.send(call);
+  }
+
+  event = events.get();
+  AWAIT_READY(event);
+  EXPECT_EQ(Event::UPDATE, event.get().type());
+  EXPECT_EQ(TASK_RUNNING, event.get().update().status().state());
+
+  {
+    Call call;
+    call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
+    call.mutable_framework_info()->mutable_id()->CopyFrom(id);
+    call.set_type(Call::RECONCILE);
+
+    Call::Reconcile::Task* task = call.mutable_reconcile()->add_tasks();
+    task->mutable_task_id()->CopyFrom(taskInfo.task_id());
+
+    mesos.send(call);
+  }
+
+  event = events.get();
+  AWAIT_READY(event);
+  EXPECT_EQ(Event::UPDATE, event.get().type());
+  EXPECT_EQ(TASK_RUNNING, event.get().update().status().state());
+  EXPECT_EQ(TaskStatus::REASON_RECONCILIATION,
+            event.get().update().status().reason());
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
+}
+
+
 // TODO(benh): Write test for sending Call::Acknowledgement through
 // master to slave when Event::Update was generated locally.