You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2015/04/25 23:33:28 UTC

[01/11] mesos git commit: Removed REQUEST call from scheduler.proto.

Repository: mesos
Updated Branches:
  refs/heads/master ec0a9f340 -> 407af3eff


Removed REQUEST call from scheduler.proto.

Review: https://reviews.apache.org/r/32501/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/978e72d4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/978e72d4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/978e72d4

Branch: refs/heads/master
Commit: 978e72d4abc633850ea168e3f749dca4ca482ae8
Parents: 79086eb
Author: Vinod Kone <vi...@gmail.com>
Authored: Fri Mar 20 16:10:32 2015 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Sat Apr 25 11:32:46 2015 -1000

----------------------------------------------------------------------
 include/mesos/mesos.proto               |  2 ++
 include/mesos/scheduler/scheduler.proto |  6 ------
 src/master/master.cpp                   |  1 -
 src/master/master.hpp                   |  1 +
 src/scheduler/scheduler.cpp             | 12 ------------
 5 files changed, 3 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/978e72d4/include/mesos/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index 3a8e8bf..967b1e3 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -609,6 +609,8 @@ message PerfStatistics {
  * to proactively influence the allocator.  If 'slave_id' is provided
  * then this request is assumed to only apply to resources on that
  * slave.
+ *
+ * TODO(vinod): Remove this once the old driver is removed.
  */
 message Request {
   optional SlaveID slave_id = 1;

http://git-wip-us.apache.org/repos/asf/mesos/blob/978e72d4/include/mesos/scheduler/scheduler.proto
----------------------------------------------------------------------
diff --git a/include/mesos/scheduler/scheduler.proto b/include/mesos/scheduler/scheduler.proto
index ce401aa..928995a 100644
--- a/include/mesos/scheduler/scheduler.proto
+++ b/include/mesos/scheduler/scheduler.proto
@@ -117,7 +117,6 @@ message Call {
     REGISTER = 1;
     REREGISTER = 2;
     UNREGISTER = 3;
-    REQUEST = 4;
     REVIVE = 6;
     DECLINE = 5;
     ACCEPT = 12;
@@ -135,10 +134,6 @@ message Call {
     // something that is not an issue with the Event/Call API.
   }
 
-  message Request {
-    repeated mesos.Request requests = 1;
-  }
-
   message Decline {
     repeated OfferID offer_ids = 1;
     optional Filters filters = 2;
@@ -208,7 +203,6 @@ message Call {
   // present if that type has a nested message definition.
   required Type type = 2;
 
-  optional Request request = 3;
   optional Decline decline = 4;
   optional Accept accept = 10;
   optional Kill kill = 6;

http://git-wip-us.apache.org/repos/asf/mesos/blob/978e72d4/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 865ff89..c9c2cc2 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1590,7 +1590,6 @@ void Master::receive(
 
   switch (call.type()) {
     case scheduler::Call::UNREGISTER:
-    case scheduler::Call::REQUEST:
     case scheduler::Call::REVIVE:
     case scheduler::Call::DECLINE:
       drop(from, call, "Unimplemented");

http://git-wip-us.apache.org/repos/asf/mesos/blob/978e72d4/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 550d2c5..59d6015 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -129,6 +129,7 @@ public:
       const process::UPID& from,
       const FrameworkID& frameworkId);
 
+  // TODO(vinod): Remove this once the old driver is removed.
   void resourceRequest(
       const process::UPID& from,
       const FrameworkID& frameworkId,

http://git-wip-us.apache.org/repos/asf/mesos/blob/978e72d4/src/scheduler/scheduler.cpp
----------------------------------------------------------------------
diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp
index 6fbd991..e80a0dc 100644
--- a/src/scheduler/scheduler.cpp
+++ b/src/scheduler/scheduler.cpp
@@ -235,18 +235,6 @@ public:
         break;
       }
 
-      case Call::REQUEST: {
-        if (!call.has_request()) {
-          drop(call, "Expecting 'request' to be present");
-          return;
-        }
-        ResourceRequestMessage message;
-        message.mutable_framework_id()->CopyFrom(call.framework_info().id());
-        message.mutable_requests()->CopyFrom(call.request().requests());
-        send(master.get(), message);
-        break;
-      }
-
       case Call::DECLINE: {
         if (!call.has_decline()) {
           drop(call, "Expecting 'decline' to be present");


[03/11] mesos git commit: Updated KILL to optionally include SlaveID.

Posted by vi...@apache.org.
Updated KILL to optionally include SlaveID.

Review: https://reviews.apache.org/r/32843


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6a553a64
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6a553a64
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6a553a64

Branch: refs/heads/master
Commit: 6a553a6431662ddb24bef23f1c9cd54af3ebf865
Parents: d3c3269
Author: Vinod Kone <vi...@gmail.com>
Authored: Fri Apr 3 10:42:12 2015 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Sat Apr 25 11:32:47 2015 -1000

----------------------------------------------------------------------
 include/mesos/scheduler/scheduler.proto |   1 +
 src/master/master.cpp                   |  45 ++++++++--
 src/master/master.hpp                   |   4 +
 src/scheduler/scheduler.cpp             |   5 +-
 src/tests/scheduler_tests.cpp           | 124 +++++++++++++++++++++++++++
 5 files changed, 169 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/6a553a64/include/mesos/scheduler/scheduler.proto
----------------------------------------------------------------------
diff --git a/include/mesos/scheduler/scheduler.proto b/include/mesos/scheduler/scheduler.proto
index f347912..5a94884 100644
--- a/include/mesos/scheduler/scheduler.proto
+++ b/include/mesos/scheduler/scheduler.proto
@@ -166,6 +166,7 @@ message Call {
 
   message Kill {
     required TaskID task_id = 1;
+    optional SlaveID slave_id = 2;
   }
 
   message Acknowledge {

http://git-wip-us.apache.org/repos/asf/mesos/blob/6a553a64/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index e762d56..cc20be9 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1612,6 +1612,12 @@ void Master::receive(
       break;
 
     case scheduler::Call::KILL:
+      if (!call.has_kill()) {
+        drop(from, call, "Expecting 'kill' to be present");
+      }
+      kill(framework, call.kill());
+      break;
+
     case scheduler::Call::ACKNOWLEDGE:
     case scheduler::Call::MESSAGE:
       drop(from, call, "Unimplemented");
@@ -2687,11 +2693,11 @@ void Master::killTask(
     const FrameworkID& frameworkId,
     const TaskID& taskId)
 {
-  ++metrics->messages_kill_task;
-
   LOG(INFO) << "Asked to kill task " << taskId
             << " of framework " << frameworkId;
 
+  ++metrics->messages_kill_task;
+
   Framework* framework = getFramework(frameworkId);
 
   if (framework == NULL) {
@@ -2708,13 +2714,28 @@ void Master::killTask(
     return;
   }
 
+  scheduler::Call::Kill call;
+  call.mutable_task_id()->CopyFrom(taskId);
+
+  kill(framework, call);
+}
+
+
+void Master::kill(Framework* framework, const scheduler::Call::Kill& kill)
+{
+  CHECK_NOTNULL(framework);
+
+  const TaskID& taskId = kill.task_id();
+  const Option<SlaveID> slaveId =
+    kill.has_slave_id() ? Option<SlaveID>(kill.slave_id()) : None();
+
   if (framework->pendingTasks.contains(taskId)) {
     // Remove from pending tasks.
     framework->pendingTasks.erase(taskId);
 
     const StatusUpdate& update = protobuf::createStatusUpdate(
-        frameworkId,
-        None(),
+        framework->id(),
+        slaveId,
         taskId,
         TASK_KILLED,
         TaskStatus::SOURCE_MASTER,
@@ -2733,18 +2754,30 @@ void Master::killTask(
 
     TaskStatus status;
     status.mutable_task_id()->CopyFrom(taskId);
+    if (slaveId.isSome()) {
+      status.mutable_slave_id()->CopyFrom(slaveId.get());
+    }
 
     _reconcileTasks(framework, {status});
     return;
   }
 
+  if (slaveId.isSome() && !(slaveId.get() == task->slave_id())) {
+    LOG(WARNING) << "Cannot kill task " << taskId << " of slave "
+                 << slaveId.get() << " of framework " << *framework
+                 << " because it belongs to different slave "
+                 << task->slave_id();
+    // TODO(vinod): Return a "Bad Request" when using HTTP API.
+    return;
+  }
+
   Slave* slave = getSlave(task->slave_id());
   CHECK(slave != NULL) << "Unknown slave " << task->slave_id();
 
   // We add the task to 'killedTasks' here because the slave
   // might be partitioned or disconnected but the master
   // doesn't know it yet.
-  slave->killedTasks.put(frameworkId, taskId);
+  slave->killedTasks.put(framework->id(), taskId);
 
   // NOTE: This task will be properly reconciled when the
   // disconnected slave re-registers with the master.
@@ -2754,7 +2787,7 @@ void Master::killTask(
               << " of framework " << *framework;
 
     KillTaskMessage message;
-    message.mutable_framework_id()->MergeFrom(frameworkId);
+    message.mutable_framework_id()->MergeFrom(framework->id());
     message.mutable_task_id()->MergeFrom(taskId);
     send(slave->pid, message);
   } else {

http://git-wip-us.apache.org/repos/asf/mesos/blob/6a553a64/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index ec17a60..5d14a53 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -457,6 +457,10 @@ private:
       Framework* framework,
       const scheduler::Call::Reconcile& reconcile);
 
+  void kill(
+      Framework* framework,
+      const scheduler::Call::Kill& kill);
+
   bool elected() const
   {
     return leader.isSome() && leader.get() == info_;

http://git-wip-us.apache.org/repos/asf/mesos/blob/6a553a64/src/scheduler/scheduler.cpp
----------------------------------------------------------------------
diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp
index 8f0f374..2bbb221 100644
--- a/src/scheduler/scheduler.cpp
+++ b/src/scheduler/scheduler.cpp
@@ -288,10 +288,7 @@ public:
           drop(call, "Expecting 'kill' to be present");
           return;
         }
-        KillTaskMessage message;
-        message.mutable_framework_id()->CopyFrom(call.framework_info().id());
-        message.mutable_task_id()->CopyFrom(call.kill().task_id());
-        send(master.get(), message);
+        send(master.get(), call);
         break;
       }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/6a553a64/src/tests/scheduler_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/scheduler_tests.cpp b/src/tests/scheduler_tests.cpp
index 4911920..a1e49af 100644
--- a/src/tests/scheduler_tests.cpp
+++ b/src/tests/scheduler_tests.cpp
@@ -318,6 +318,130 @@ TEST_F(SchedulerTest, ReconcileTask)
 }
 
 
+TEST_F(SchedulerTest, KillTask)
+{
+  Try<PID<Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+  TestContainerizer containerizer(&exec);
+
+  Try<PID<Slave>> slave = StartSlave(&containerizer);
+  ASSERT_SOME(slave);
+
+  Callbacks callbacks;
+
+  Future<Nothing> connected;
+  EXPECT_CALL(callbacks, connected())
+    .WillOnce(FutureSatisfy(&connected));
+
+  scheduler::Mesos mesos(
+      master.get(),
+      DEFAULT_CREDENTIAL,
+      lambda::bind(&Callbacks::connected, lambda::ref(callbacks)),
+      lambda::bind(&Callbacks::disconnected, lambda::ref(callbacks)),
+      lambda::bind(&Callbacks::received, lambda::ref(callbacks), lambda::_1));
+
+  AWAIT_READY(connected);
+
+  Queue<Event> events;
+
+  EXPECT_CALL(callbacks, received(_))
+    .WillRepeatedly(Enqueue(&events));
+
+  {
+    Call call;
+    call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
+    call.set_type(Call::REGISTER);
+
+    mesos.send(call);
+  }
+
+  Future<Event> event = events.get();
+  AWAIT_READY(event);
+  EXPECT_EQ(Event::REGISTERED, event.get().type());
+
+  FrameworkID id(event.get().registered().framework_id());
+
+  event = events.get();
+  AWAIT_READY(event);
+  EXPECT_EQ(Event::OFFERS, event.get().type());
+  EXPECT_NE(0, event.get().offers().offers().size());
+
+  EXPECT_CALL(exec, registered(_, _, _, _))
+    .Times(1);
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  Offer offer = event.get().offers().offers(0);
+  TaskInfo taskInfo = createTask(offer,"", DEFAULT_EXECUTOR_ID);
+
+  {
+    Call call;
+    call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
+    call.mutable_framework_info()->mutable_id()->CopyFrom(id);
+    call.set_type(Call::ACCEPT);
+
+    Call::Accept* accept = call.mutable_accept();
+    accept->add_offer_ids()->CopyFrom(offer.id());
+
+    Offer::Operation* operation = accept->add_operations();
+    operation->set_type(Offer::Operation::LAUNCH);
+    operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
+
+    mesos.send(call);
+  }
+
+  event = events.get();
+  AWAIT_READY(event);
+  EXPECT_EQ(Event::UPDATE, event.get().type());
+  EXPECT_EQ(TASK_RUNNING, event.get().update().status().state());
+
+  {
+    // Acknowledge TASK_RUNNING update.
+    Call call;
+    call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
+    call.mutable_framework_info()->mutable_id()->CopyFrom(id);
+    call.set_type(Call::ACKNOWLEDGE);
+
+    Call::Acknowledge* acknowledge = call.mutable_acknowledge();
+    acknowledge->mutable_task_id()->CopyFrom(taskInfo.task_id());
+    acknowledge->mutable_slave_id()->CopyFrom(offer.slave_id());
+    acknowledge->set_uuid(event.get().update().status().uuid());
+
+    mesos.send(call);
+  }
+
+  EXPECT_CALL(exec, killTask(_, _))
+    .WillOnce(SendStatusUpdateFromTaskID(TASK_KILLED));
+
+  {
+    Call call;
+    call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
+    call.mutable_framework_info()->mutable_id()->CopyFrom(id);
+    call.set_type(Call::KILL);
+
+    Call::Kill* kill = call.mutable_kill();
+    kill->mutable_task_id()->CopyFrom(taskInfo.task_id());
+    kill->mutable_slave_id()->CopyFrom(offer.slave_id());
+
+    mesos.send(call);
+  }
+
+  event = events.get();
+  AWAIT_READY(event);
+  EXPECT_EQ(Event::UPDATE, event.get().type());
+  EXPECT_EQ(TASK_KILLED, event.get().update().status().state());
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
+}
+
+
 // TODO(benh): Write test for sending Call::Acknowledgement through
 // master to slave when Event::Update was generated locally.
 


[07/11] mesos git commit: Removed 'uuid' field from UPDATE call.

Posted by vi...@apache.org.
Removed 'uuid' field from UPDATE call.

Review: https://reviews.apache.org/r/33465


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/32d1b67d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/32d1b67d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/32d1b67d

Branch: refs/heads/master
Commit: 32d1b67dded33c5859c271306538bb58a5de04be
Parents: c3de1e8
Author: Vinod Kone <vi...@gmail.com>
Authored: Wed Apr 22 19:16:51 2015 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Sat Apr 25 11:32:48 2015 -1000

----------------------------------------------------------------------
 include/mesos/scheduler/scheduler.proto         |  3 +--
 src/examples/low_level_scheduler_libprocess.cpp | 22 +++++++++++---------
 src/examples/low_level_scheduler_pthread.cpp    | 22 +++++++++++---------
 src/sched/sched.cpp                             |  6 +++---
 src/scheduler/scheduler.cpp                     | 15 ++++++++-----
 src/tests/scheduler_tests.cpp                   |  1 +
 6 files changed, 39 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/32d1b67d/include/mesos/scheduler/scheduler.proto
----------------------------------------------------------------------
diff --git a/include/mesos/scheduler/scheduler.proto b/include/mesos/scheduler/scheduler.proto
index ec9adf6..5ca64cb 100644
--- a/include/mesos/scheduler/scheduler.proto
+++ b/include/mesos/scheduler/scheduler.proto
@@ -57,8 +57,7 @@ message Event {
   }
 
   message Update {
-    required bytes uuid = 1; // TODO(benh): Replace with UpdateID.
-    required TaskStatus status = 2;
+    required TaskStatus status = 1;
   }
 
   message Message {

http://git-wip-us.apache.org/repos/asf/mesos/blob/32d1b67d/src/examples/low_level_scheduler_libprocess.cpp
----------------------------------------------------------------------
diff --git a/src/examples/low_level_scheduler_libprocess.cpp b/src/examples/low_level_scheduler_libprocess.cpp
index b55ad60..bee2e7e 100644
--- a/src/examples/low_level_scheduler_libprocess.cpp
+++ b/src/examples/low_level_scheduler_libprocess.cpp
@@ -142,7 +142,7 @@ public:
           cout << endl << "Received an UPDATE event" << endl;
 
           // TODO(zuyu): Do batch processing of UPDATE events.
-          statusUpdate(event.update().uuid(), event.update().status());
+          statusUpdate(event.update().status());
           break;
         }
 
@@ -251,7 +251,7 @@ private:
     }
   }
 
-  void statusUpdate(const string& uuid, const TaskStatus& status)
+  void statusUpdate(const TaskStatus& status)
   {
     cout << "Task " << status.task_id() << " is in state " << status.state();
 
@@ -260,16 +260,18 @@ private:
     }
     cout << endl;
 
-    Call call;
-    call.mutable_framework_info()->CopyFrom(framework);
-    call.set_type(Call::ACKNOWLEDGE);
+    if (status.has_uuid()) {
+      Call call;
+      call.mutable_framework_info()->CopyFrom(framework);
+      call.set_type(Call::ACKNOWLEDGE);
 
-    Call::Acknowledge* ack = call.mutable_acknowledge();
-    ack->mutable_slave_id()->CopyFrom(status.slave_id());
-    ack->mutable_task_id ()->CopyFrom(status.task_id ());
-    ack->set_uuid(uuid);
+      Call::Acknowledge* ack = call.mutable_acknowledge();
+      ack->mutable_slave_id()->CopyFrom(status.slave_id());
+      ack->mutable_task_id ()->CopyFrom(status.task_id ());
+      ack->set_uuid(status.uuid());
 
-    mesos.send(call);
+      mesos.send(call);
+    }
 
     if (status.state() == TASK_FINISHED) {
       ++tasksFinished;

http://git-wip-us.apache.org/repos/asf/mesos/blob/32d1b67d/src/examples/low_level_scheduler_pthread.cpp
----------------------------------------------------------------------
diff --git a/src/examples/low_level_scheduler_pthread.cpp b/src/examples/low_level_scheduler_pthread.cpp
index 64a0e44..fb8cd66 100644
--- a/src/examples/low_level_scheduler_pthread.cpp
+++ b/src/examples/low_level_scheduler_pthread.cpp
@@ -163,7 +163,7 @@ public:
           cout << endl << "Received an UPDATE event" << endl;
 
           // TODO(zuyu): Do batch processing of UPDATE events.
-          statusUpdate(event.update().uuid(), event.update().status());
+          statusUpdate(event.update().status());
           break;
         }
 
@@ -299,7 +299,7 @@ private:
     }
   }
 
-  void statusUpdate(const string& uuid, const TaskStatus& status)
+  void statusUpdate(const TaskStatus& status)
   {
     cout << "Task " << status.task_id() << " is in state " << status.state();
 
@@ -308,16 +308,18 @@ private:
     }
     cout << endl;
 
-    Call call;
-    call.set_type(Call::ACKNOWLEDGE);
-    call.mutable_framework_info()->CopyFrom(framework);
+    if (status.has_uuid()) {
+      Call call;
+      call.set_type(Call::ACKNOWLEDGE);
+      call.mutable_framework_info()->CopyFrom(framework);
 
-    Call::Acknowledge* ack = call.mutable_acknowledge();
-    ack->mutable_slave_id()->CopyFrom(status.slave_id());
-    ack->mutable_task_id ()->CopyFrom(status.task_id ());
-    ack->set_uuid(uuid);
+      Call::Acknowledge* ack = call.mutable_acknowledge();
+      ack->mutable_slave_id()->CopyFrom(status.slave_id());
+      ack->mutable_task_id ()->CopyFrom(status.task_id ());
+      ack->set_uuid(status.uuid());
 
-    mesos.send(call);
+      mesos.send(call);
+    }
 
     if (status.state() == TASK_FINISHED) {
       ++tasksFinished;

http://git-wip-us.apache.org/repos/asf/mesos/blob/32d1b67d/src/sched/sched.cpp
----------------------------------------------------------------------
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index 66fd2b3..8c366ec 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -698,9 +698,9 @@ protected:
     // ensure that a 0.22.0 scheduler driver supports explicit
     // acknowledgements, even if running against a 0.21.0 cluster.
     //
-    // TODO(bmahler): Update the slave / executor driver to ensure
-    // that 'uuid' is set accurately by the time it reaches the
-    // scheduler driver. This will be required for pure bindings.
+    // TODO(bmahler): Update master and slave to ensure that 'uuid' is
+    // set accurately by the time it reaches the scheduler driver.
+    // This will be required for pure bindings.
     if (from == UPID() || pid == UPID()) {
       status.clear_uuid();
     } else {

http://git-wip-us.apache.org/repos/asf/mesos/blob/32d1b67d/src/scheduler/scheduler.cpp
----------------------------------------------------------------------
diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp
index 2047ee4..d3d28ee 100644
--- a/src/scheduler/scheduler.cpp
+++ b/src/scheduler/scheduler.cpp
@@ -44,6 +44,7 @@
 #include <process/future.hpp>
 #include <process/id.hpp>
 #include <process/mutex.hpp>
+#include <process/pid.hpp>
 #include <process/process.hpp>
 #include <process/protobuf.hpp>
 
@@ -642,8 +643,15 @@ protected:
 
     update->mutable_status()->set_timestamp(message.update().timestamp());
 
-    update->set_uuid(message.update().uuid());
-    update->mutable_status()->set_uuid(message.update().uuid());
+    // If the update is generated by the master it doesn't need to be
+    // acknowledged; so we unset the UUID inside TaskStatus.
+    // TODO(vinod): Update master and slave to ensure that 'uuid' is
+    // set accurately by the time it reaches the scheduler.
+    if (UPID(message.pid()) == UPID()) {
+      update->mutable_status()->clear_uuid();
+    } else {
+      update->mutable_status()->set_uuid(message.update().uuid());
+    }
 
     receive(from, event);
   }
@@ -729,9 +737,6 @@ protected:
     status->set_message(message);
     status->set_timestamp(Clock::now().secs());
 
-    update->set_uuid(UUID::random().toBytes());
-    status->set_uuid(update->uuid());
-
     receive(None(), event);
   }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/32d1b67d/src/tests/scheduler_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/scheduler_tests.cpp b/src/tests/scheduler_tests.cpp
index 54d6bc9..f2cb1d8 100644
--- a/src/tests/scheduler_tests.cpp
+++ b/src/tests/scheduler_tests.cpp
@@ -308,6 +308,7 @@ TEST_F(SchedulerTest, ReconcileTask)
   event = events.get();
   AWAIT_READY(event);
   EXPECT_EQ(Event::UPDATE, event.get().type());
+  EXPECT_FALSE(event.get().update().status().has_uuid());
   EXPECT_EQ(TASK_RUNNING, event.get().update().status().state());
   EXPECT_EQ(TaskStatus::REASON_RECONCILIATION,
             event.get().update().status().reason());


[11/11] mesos git commit: Documented the scheduler Event/Call protobufs.

Posted by vi...@apache.org.
Documented the scheduler Event/Call protobufs.

Review: https://reviews.apache.org/r/32509


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/407af3ef
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/407af3ef
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/407af3ef

Branch: refs/heads/master
Commit: 407af3eff050cd0318037c0cbcc6f3d6e1ecd9ee
Parents: 32d1b67
Author: Vinod Kone <vi...@gmail.com>
Authored: Wed Mar 25 15:34:08 2015 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Sat Apr 25 11:32:49 2015 -1000

----------------------------------------------------------------------
 include/mesos/scheduler/scheduler.proto | 129 +++++++++++++++++++--------
 1 file changed, 92 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/407af3ef/include/mesos/scheduler/scheduler.proto
----------------------------------------------------------------------
diff --git a/include/mesos/scheduler/scheduler.proto b/include/mesos/scheduler/scheduler.proto
index 5ca64cb..249ec53 100644
--- a/include/mesos/scheduler/scheduler.proto
+++ b/include/mesos/scheduler/scheduler.proto
@@ -25,22 +25,23 @@ option java_outer_classname = "Protos";
 
 
 /**
- * Low-level scheduler event API.
+ * Scheduler event API.
  *
  * An event is described using the standard protocol buffer "union"
- * trick, see https://developers.google.com/protocol-buffers/docs/techniques#union.
+ * trick, see:
+ * https://developers.google.com/protocol-buffers/docs/techniques#union.
  */
 message Event {
   // Possible event types, followed by message definitions if
   // applicable.
   enum Type {
-    SUBSCRIBED = 1;
-    OFFERS = 3;
-    RESCIND = 4;
-    UPDATE = 5;
-    MESSAGE = 6;
-    FAILURE = 7;
-    ERROR = 8;
+    SUBSCRIBED = 1; // See 'Subscribed' below.
+    OFFERS = 2;     // See 'Offers' below.
+    RESCIND = 3;    // See 'Rescind' below.
+    UPDATE = 4;     // See 'Update' below.
+    MESSAGE = 5;    // See 'Message' below.
+    FAILURE = 6;    // See 'Failure' below.
+    ERROR = 7;      // See 'Error' below.
   }
 
   // First event received when the scheduler subscribes.
@@ -48,26 +49,56 @@ message Event {
     required FrameworkID framework_id = 1;
   }
 
+  // Received whenever there are new resources that are offered to the
+  // scheduler. Each offer corresponds to a set of resources on a
+  // slave. Until the scheduler accepts or declines an offer the
+  // resources are considered allocated to the scheduler.
   message Offers {
     repeated Offer offers = 1;
   }
 
+  // Received when a particular offer is no longer valid (e.g., the
+  // slave corresponding to the offer has been removed) and hence
+  // needs to be rescinded. Any future calls ('Accept' / 'Decline') made
+  // by the scheduler regarding this offer will be invalid.
   message Rescind {
     required OfferID offer_id = 1;
   }
 
+  // Received whenever there is a status update that is generated by
+  // the executor or slave or master. Status updates should be used by
+  // executors to reliably communicate the status of the tasks that
+  // they manage. It is crucial that a terminal update (see TaskState
+  // in mesos.proto) is sent by the executor as soon as the task
+  // terminates, in order for Mesos to release the resources allocated
+  // to the task. It is also the responsibility of the scheduler to
+  // explicitly acknowledge the receipt of a status update. See
+  // 'Acknowledge' in the 'Call' section below for the semantics.
   message Update {
     required TaskStatus status = 1;
   }
 
+  // Received when a custom message generated by the executor is
+  // forwarded by the master. Note that this message is not
+  // interpreted by Mesos and is only forwarded (without reliability
+  // guarantees) to the scheduler. It is up to the executor to retry
+  // if the message is dropped for any reason.
   message Message {
     required SlaveID slave_id = 1;
     required ExecutorID executor_id = 2;
     required bytes data = 3;
   }
 
+  // Received when a slave is removed from the cluster (e.g., failed
+  // health checks) or when an executor is terminated. Note that, this
+  // event coincides with receipt of terminal UPDATE events for any
+  // active tasks belonging to the slave or executor and receipt of
+  // 'Rescind' events for any outstanding offers belonging to the
+  // slave. Note that there is no guaranteed order between the
+  // 'Failure', 'Update' and 'Rescind' events when a slave or executor
+  // is removed.
   // TODO(vinod): Consider splitting the lost slave and terminated
-  // executor into separate events.
+  // executor into separate events and ensure it's reliably generated.
   message Failure {
     optional SlaveID slave_id = 1;
 
@@ -78,28 +109,33 @@ message Event {
     optional int32 status = 3;
   }
 
+  // Received when an invalid framework (e.g., unauthenticated,
+  // unauthorized) attempts to subscribe with the master. Error can
+  // also be received if scheduler sends invalid Calls (e.g., not
+  // properly initialized).
+  // TODO(vinod): Remove this once the old scheduler driver is no
+  // longer supported. With HTTP API all errors will be signaled via
+  // HTTP response codes.
   message Error {
     required string message = 1;
   }
 
-  // TODO(benh): Add a 'from' or 'sender'.
-
   // Type of the event, indicates which optional field below should be
   // present if that type has a nested message definition.
   required Type type = 1;
 
   optional Subscribed subscribed = 2;
-  optional Offers offers = 4;
-  optional Rescind rescind = 5;
-  optional Update update = 6;
-  optional Message message = 7;
-  optional Failure failure = 8;
-  optional Error error = 9;
+  optional Offers offers = 3;
+  optional Rescind rescind = 4;
+  optional Update update = 5;
+  optional Message message = 6;
+  optional Failure failure = 7;
+  optional Error error = 8;
 }
 
 
 /**
- * Low-level scheduler call API.
+ * Scheduler call API.
  *
  * Like Event, a Call is described using the standard protocol buffer
  * "union" trick (see above).
@@ -108,16 +144,16 @@ message Call {
   // Possible call types, followed by message definitions if
   // applicable.
   enum Type {
-    SUBSCRIBE = 1;    // See 'framework_info' below.
-    TEARDOWN = 3;     // Shuts down all tasks and executors.
-    REVIVE = 6;
-    DECLINE = 5;
-    ACCEPT = 12;
-    KILL = 8;
-    ACKNOWLEDGE = 9;
-    RECONCILE = 10;
-    MESSAGE = 11;
-    SHUTDOWN = 13;
+    SUBSCRIBE = 1;   // See 'framework_info' below.
+    TEARDOWN = 2;    // Shuts down all tasks/executors and removes framework.
+    ACCEPT = 3;      // See 'Accept' below.
+    DECLINE = 4;     // See 'Decline' below.
+    REVIVE = 5;      // Removes any previous filters set via ACCEPT or DECLINE.
+    KILL = 6;        // See 'Kill' below.
+    SHUTDOWN = 7;    // See 'Shutdown' below.
+    ACKNOWLEDGE = 8; // See 'Acknowledge' below.
+    RECONCILE = 9;   // See 'Reconcile' below.
+    MESSAGE = 10;    // See 'Message' below.
 
     // TODO(benh): Consider adding an 'ACTIVATE' and 'DEACTIVATE' for
     // already subscribed frameworks as a way of stopping offers from
@@ -128,11 +164,6 @@ message Call {
     // something that is not an issue with the Event/Call API.
   }
 
-  message Decline {
-    repeated OfferID offer_ids = 1;
-    optional Filters filters = 2;
-  }
-
   // Accepts an offer, performing the specified operations
   // in a sequential manner.
   //
@@ -160,6 +191,22 @@ message Call {
     optional Filters filters = 3;
   }
 
+  // Declines an offer, signaling the master to potentially reoffer
+  // the resources to a different framework. Note that this is same
+  // as sending an Accept call with no operations. See comments on
+  // top of 'Accept' for semantics.
+  message Decline {
+    repeated OfferID offer_ids = 1;
+    optional Filters filters = 2;
+  }
+
+  // Kills a specific task. If the scheduler has a custom executor,
+  // the kill is forwarded to the executor and it is up to the
+  // executor to kill the task and send a TASK_KILLED (or TASK_FAILED)
+  // update. Note that Mesos releases the resources for a task once it
+  // receives a terminal update (See TaskState in mesos.proto) for it.
+  // If the task is unknown to the master, a TASK_LOST update is
+  // generated.
   message Kill {
     required TaskID task_id = 1;
     optional SlaveID slave_id = 2;
@@ -177,6 +224,11 @@ message Call {
     required SlaveID slave_id = 2;
   }
 
+  // Acknowledges the receipt of status update. Schedulers are
+  // responsible for explicitly acknowledging the receipt of status
+  // updates that have 'Update.status().uuid()' field set. Such status
+  // updates are retried by the slave until they are acknowledged by
+  // the scheduler.
   message Acknowledge {
     required SlaveID slave_id = 1;
     required TaskID task_id = 2;
@@ -199,6 +251,9 @@ message Call {
     repeated Task tasks = 1;
   }
 
+  // Sends arbitrary binary data to the executor. Note that Mesos
+  // neither interprets this data nor makes any guarantees about the
+  // delivery of this message to the executor.
   message Message {
     required SlaveID slave_id = 1;
     required ExecutorID executor_id = 2;
@@ -216,11 +271,11 @@ message Call {
   // present if that type has a nested message definition.
   required Type type = 2;
 
+  optional Accept accept = 3;
   optional Decline decline = 4;
-  optional Accept accept = 10;
-  optional Kill kill = 6;
+  optional Kill kill = 5;
+  optional Shutdown shutdown = 6;
   optional Acknowledge acknowledge = 7;
   optional Reconcile reconcile = 8;
   optional Message message = 9;
-  optional Shutdown shutdown = 11;
 }


[06/11] mesos git commit: Updated RECONCILE call to optionally specifiy a slave id.

Posted by vi...@apache.org.
Updated RECONCILE call to optionally specifiy a slave id.

Review: https://reviews.apache.org/r/32502/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f95fa119
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f95fa119
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f95fa119

Branch: refs/heads/master
Commit: f95fa119044c9a11c8473ab088e948e7e1c1334d
Parents: 978e72d
Author: Vinod Kone <vi...@gmail.com>
Authored: Fri Mar 20 16:54:36 2015 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Sat Apr 25 11:32:47 2015 -1000

----------------------------------------------------------------------
 include/mesos/scheduler/scheduler.proto |  20 +++--
 src/master/master.cpp                   |  32 +++++++-
 src/master/master.hpp                   |   4 +
 src/scheduler/scheduler.cpp             |   6 +-
 src/tests/scheduler_tests.cpp           | 107 +++++++++++++++++++++++++++
 5 files changed, 156 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/f95fa119/include/mesos/scheduler/scheduler.proto
----------------------------------------------------------------------
diff --git a/include/mesos/scheduler/scheduler.proto b/include/mesos/scheduler/scheduler.proto
index 928995a..51bfe8d 100644
--- a/include/mesos/scheduler/scheduler.proto
+++ b/include/mesos/scheduler/scheduler.proto
@@ -176,16 +176,20 @@ message Call {
     required bytes uuid = 3;
   }
 
-  // Allows the framework to query the status for non-terminal tasks.
+  // Allows the scheduler to query the status for non-terminal tasks.
   // This causes the master to send back the latest task status for
-  // each task in 'statuses', if possible. Tasks that are no longer
-  // known will result in a TASK_LOST update. If statuses is empty,
-  // then the master will send the latest status for each task
-  // currently known.
-  // TODO(bmahler): Add a guiding document for reconciliation or
-  // document reconciliation in-depth here.
+  // each task in 'tasks', if possible. Tasks that are no longer known
+  // will result in a TASK_LOST update. If 'statuses' is empty, then
+  // the master will send the latest status for each task currently
+  // known.
   message Reconcile {
-    repeated TaskStatus statuses = 1; // Should be non-terminal only.
+   // TODO(vinod): Support arbitrary queries than just state of tasks.
+    message Task {
+      required TaskID task_id = 1;
+      optional SlaveID slave_id = 2;
+    }
+
+    repeated Task tasks = 1;
   }
 
   message Message {

http://git-wip-us.apache.org/repos/asf/mesos/blob/f95fa119/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index c9c2cc2..e762d56 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1603,9 +1603,16 @@ void Master::receive(
       accept(framework, call.accept());
       break;
 
+    case scheduler::Call::RECONCILE:
+      if (!call.has_reconcile()) {
+        drop(from, call, "Expecting 'reconcile' to be present");
+        return;
+      }
+      reconcile(framework, call.reconcile());
+      break;
+
     case scheduler::Call::KILL:
     case scheduler::Call::ACKNOWLEDGE:
-    case scheduler::Call::RECONCILE:
     case scheduler::Call::MESSAGE:
       drop(from, call, "Unimplemented");
       break;
@@ -3477,6 +3484,29 @@ void Master::shutdownSlave(const SlaveID& slaveId, const string& message)
 }
 
 
+void Master::reconcile(
+    Framework* framework,
+    const scheduler::Call::Reconcile& reconcile)
+{
+  CHECK_NOTNULL(framework);
+
+  // Construct 'TaskStatus'es from 'Reconcile::Task's.
+  vector<TaskStatus> statuses;
+  foreach (const scheduler::Call::Reconcile::Task& task, reconcile.tasks()) {
+    TaskStatus status;
+    status.mutable_task_id()->CopyFrom(task.task_id());
+    status.set_state(TASK_RUNNING); // Dummy status.
+    if (task.has_slave_id()) {
+      status.mutable_slave_id()->CopyFrom(task.slave_id());
+    }
+
+    statuses.push_back(status);
+  }
+
+  _reconcileTasks(framework, statuses);
+}
+
+
 void Master::reconcileTasks(
     const UPID& from,
     const FrameworkID& frameworkId,

http://git-wip-us.apache.org/repos/asf/mesos/blob/f95fa119/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 59d6015..ec17a60 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -453,6 +453,10 @@ private:
     const scheduler::Call::Accept& accept,
     const process::Future<std::list<process::Future<bool>>>& authorizations);
 
+  void reconcile(
+      Framework* framework,
+      const scheduler::Call::Reconcile& reconcile);
+
   bool elected() const
   {
     return leader.isSome() && leader.get() == info_;

http://git-wip-us.apache.org/repos/asf/mesos/blob/f95fa119/src/scheduler/scheduler.cpp
----------------------------------------------------------------------
diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp
index e80a0dc..d417442 100644
--- a/src/scheduler/scheduler.cpp
+++ b/src/scheduler/scheduler.cpp
@@ -314,10 +314,8 @@ public:
           drop(call, "Expecting 'reconcile' to be present");
           return;
         }
-        ReconcileTasksMessage message;
-        message.mutable_framework_id()->CopyFrom(call.framework_info().id());
-        message.mutable_statuses()->CopyFrom(call.reconcile().statuses());
-        send(master.get(), message);
+
+        send(master.get(), call);
         break;
       }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/f95fa119/src/tests/scheduler_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/scheduler_tests.cpp b/src/tests/scheduler_tests.cpp
index 4ea5528..4911920 100644
--- a/src/tests/scheduler_tests.cpp
+++ b/src/tests/scheduler_tests.cpp
@@ -211,6 +211,113 @@ TEST_F(SchedulerTest, TaskRunning)
 }
 
 
+TEST_F(SchedulerTest, ReconcileTask)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+  TestContainerizer containerizer(&exec);
+
+  Try<PID<Slave> > slave = StartSlave(&containerizer);
+  ASSERT_SOME(slave);
+
+  Callbacks callbacks;
+
+  Future<Nothing> connected;
+  EXPECT_CALL(callbacks, connected())
+    .WillOnce(FutureSatisfy(&connected));
+
+  scheduler::Mesos mesos(
+      master.get(),
+      DEFAULT_CREDENTIAL,
+      lambda::bind(&Callbacks::connected, lambda::ref(callbacks)),
+      lambda::bind(&Callbacks::disconnected, lambda::ref(callbacks)),
+      lambda::bind(&Callbacks::received, lambda::ref(callbacks), lambda::_1));
+
+  AWAIT_READY(connected);
+
+  Queue<Event> events;
+
+  EXPECT_CALL(callbacks, received(_))
+    .WillRepeatedly(Enqueue(&events));
+
+  {
+    Call call;
+    call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
+    call.set_type(Call::REGISTER);
+
+    mesos.send(call);
+  }
+
+  Future<Event> event = events.get();
+  AWAIT_READY(event);
+  EXPECT_EQ(Event::REGISTERED, event.get().type());
+
+  FrameworkID id(event.get().registered().framework_id());
+
+  event = events.get();
+  AWAIT_READY(event);
+  EXPECT_EQ(Event::OFFERS, event.get().type());
+  EXPECT_NE(0, event.get().offers().offers().size());
+
+  EXPECT_CALL(exec, registered(_, _, _, _))
+    .Times(1);
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  Offer offer = event.get().offers().offers(0);
+  TaskInfo taskInfo = createTask(offer,"", DEFAULT_EXECUTOR_ID);
+
+  {
+    Call call;
+    call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
+    call.mutable_framework_info()->mutable_id()->CopyFrom(id);
+    call.set_type(Call::ACCEPT);
+
+    Call::Accept* accept = call.mutable_accept();
+    accept->add_offer_ids()->CopyFrom(offer.id());
+
+    Offer::Operation* operation = accept->add_operations();
+    operation->set_type(Offer::Operation::LAUNCH);
+    operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
+
+    mesos.send(call);
+  }
+
+  event = events.get();
+  AWAIT_READY(event);
+  EXPECT_EQ(Event::UPDATE, event.get().type());
+  EXPECT_EQ(TASK_RUNNING, event.get().update().status().state());
+
+  {
+    Call call;
+    call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
+    call.mutable_framework_info()->mutable_id()->CopyFrom(id);
+    call.set_type(Call::RECONCILE);
+
+    Call::Reconcile::Task* task = call.mutable_reconcile()->add_tasks();
+    task->mutable_task_id()->CopyFrom(taskInfo.task_id());
+
+    mesos.send(call);
+  }
+
+  event = events.get();
+  AWAIT_READY(event);
+  EXPECT_EQ(Event::UPDATE, event.get().type());
+  EXPECT_EQ(TASK_RUNNING, event.get().update().status().state());
+  EXPECT_EQ(TaskStatus::REASON_RECONCILIATION,
+            event.get().update().status().reason());
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
+}
+
+
 // TODO(benh): Write test for sending Call::Acknowledgement through
 // master to slave when Event::Update was generated locally.
 


[04/11] mesos git commit: Added SHUTDOWN scheduler call.

Posted by vi...@apache.org.
Added SHUTDOWN scheduler call.

Review: https://reviews.apache.org/r/32505


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2d447e76
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2d447e76
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2d447e76

Branch: refs/heads/master
Commit: 2d447e762a4a11e7b7f916b045acc90c65b51b90
Parents: 6a553a6
Author: Vinod Kone <vi...@gmail.com>
Authored: Mon Mar 23 18:02:46 2015 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Sat Apr 25 11:32:47 2015 -1000

----------------------------------------------------------------------
 include/mesos/scheduler/scheduler.proto |  16 ++++
 src/master/master.cpp                   |  52 +++++++++-
 src/master/master.hpp                   |   4 +
 src/messages/messages.proto             |  11 ++-
 src/scheduler/scheduler.cpp             |  24 +++++
 src/slave/slave.cpp                     |  84 +++++++++++++++-
 src/slave/slave.hpp                     |  33 ++++---
 src/tests/scheduler_tests.cpp           | 137 ++++++++++++++++++++++++---
 8 files changed, 324 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/2d447e76/include/mesos/scheduler/scheduler.proto
----------------------------------------------------------------------
diff --git a/include/mesos/scheduler/scheduler.proto b/include/mesos/scheduler/scheduler.proto
index 5a94884..7a77fe1 100644
--- a/include/mesos/scheduler/scheduler.proto
+++ b/include/mesos/scheduler/scheduler.proto
@@ -71,6 +71,8 @@ message Event {
     required bytes data = 3;
   }
 
+  // TODO(vinod): Consider splitting the lost slave and terminated
+  // executor into separate events.
   message Failure {
     optional SlaveID slave_id = 1;
 
@@ -122,6 +124,7 @@ message Call {
     ACKNOWLEDGE = 9;
     RECONCILE = 10;
     MESSAGE = 11;
+    SHUTDOWN = 13;
 
     // TODO(benh): Consider adding an 'ACTIVATE' and 'DEACTIVATE' for
     // already registered frameworks as a way of stopping offers from
@@ -169,6 +172,18 @@ message Call {
     optional SlaveID slave_id = 2;
   }
 
+  // Shuts down a custom executor. When the executor gets a shutdown
+  // event, it is expected to kill all its tasks (and send TASK_KILLED
+  // updates) and terminate. If the executor doesn’t terminate within
+  // a certain timeout (configurable via
+  // '--executor_shutdown_grace_period' slave flag), the slave will
+  // forcefully destroy the container (executor and its tasks) and
+  // transition its active tasks to TASK_LOST.
+  message Shutdown {
+    required ExecutorID executor_id = 1;
+    required SlaveID slave_id = 2;
+  }
+
   message Acknowledge {
     required SlaveID slave_id = 1;
     required TaskID task_id = 2;
@@ -212,4 +227,5 @@ message Call {
   optional Acknowledge acknowledge = 7;
   optional Reconcile reconcile = 8;
   optional Message message = 9;
+  optional Shutdown shutdown = 11;
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/2d447e76/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index cc20be9..d443c80 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1611,6 +1611,13 @@ void Master::receive(
       reconcile(framework, call.reconcile());
       break;
 
+    case scheduler::Call::SHUTDOWN:
+      if (!call.has_shutdown()) {
+        drop(from, call, "Expecting 'shutdown' to be present");
+      }
+      shutdown(framework, call.shutdown());
+      break;
+
     case scheduler::Call::KILL:
       if (!call.has_kill()) {
         drop(from, call, "Expecting 'kill' to be present");
@@ -3484,13 +3491,52 @@ void Master::exitedExecutor(
 
   LOG(INFO) << "Executor " << executorId
             << " of framework " << frameworkId
-            << " on slave " << *slave << " "
+            << " on slave " << *slave << ": "
             << WSTRINGIFY(status);
 
   removeExecutor(slave, frameworkId, executorId);
 
-  // TODO(benh): Send the framework its executor's exit status?
-  // Or maybe at least have something like Scheduler::executorLost?
+  // TODO(vinod): Reliably forward this message to the scheduler.
+  Framework* framework = getFramework(frameworkId);
+  if (framework == NULL) {
+    LOG(WARNING)
+      << "Not forwarding exited executor message for executor '" << executorId
+      << "' of framework " << frameworkId << " on slave " << *slave
+      << " because the framework is unknown";
+
+    return;
+  }
+
+  ExitedExecutorMessage message;
+  message.mutable_executor_id()->CopyFrom(executorId);
+  message.mutable_framework_id()->CopyFrom(frameworkId);
+  message.mutable_slave_id()->CopyFrom(slaveId);
+  message.set_status(status);
+
+  send(framework->pid, message);
+}
+
+
+void Master::shutdown(
+    Framework* framework,
+    const scheduler::Call::Shutdown& shutdown)
+{
+  CHECK_NOTNULL(framework);
+
+  if (!slaves.registered.contains(shutdown.slave_id())) {
+    LOG(WARNING) << "Unable to shutdown executor '" << shutdown.executor_id()
+                 << "' of framework " << framework->id()
+                 << " of unknown slave " << shutdown.slave_id();
+    return;
+  }
+
+  Slave* slave = slaves.registered[shutdown.slave_id()];
+  CHECK_NOTNULL(slave);
+
+  ShutdownExecutorMessage message;
+  message.mutable_executor_id()->CopyFrom(shutdown.executor_id());
+  message.mutable_framework_id()->CopyFrom(framework->id());
+  send(slave->pid, message);
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/2d447e76/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 5d14a53..bf1661a 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -461,6 +461,10 @@ private:
       Framework* framework,
       const scheduler::Call::Kill& kill);
 
+  void shutdown(
+      Framework* framework,
+      const scheduler::Call::Shutdown& shutdown);
+
   bool elected() const
   {
     return leader.isSome() && leader.get() == info_;

http://git-wip-us.apache.org/repos/asf/mesos/blob/2d447e76/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index bdf474b..98d859f 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -309,9 +309,14 @@ message ShutdownFrameworkMessage {
 }
 
 
-// Tells the executor to initiate a shut down by invoking
-// Executor::shutdown.
-message ShutdownExecutorMessage {}
+// Tells a slave (and consequently executor) to shutdown an executor.
+message ShutdownExecutorMessage {
+  // TODO(vinod): Make these fields required. These are made optional
+  // for backwards compatibility between 0.23.0 slave and pre 0.23.0
+  // executor driver.
+  optional ExecutorID executor_id = 1;
+  optional FrameworkID framework_id = 2;
+}
 
 
 message UpdateFrameworkMessage {

http://git-wip-us.apache.org/repos/asf/mesos/blob/2d447e76/src/scheduler/scheduler.cpp
----------------------------------------------------------------------
diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp
index 2bbb221..7d53d51 100644
--- a/src/scheduler/scheduler.cpp
+++ b/src/scheduler/scheduler.cpp
@@ -292,6 +292,15 @@ public:
         break;
       }
 
+      case Call::SHUTDOWN: {
+        if (!call.has_shutdown()) {
+          drop(call, "Expecting 'shutdown' to be present");
+          return;
+        }
+        send(master.get(), call);
+        break;
+      }
+
       case Call::ACKNOWLEDGE: {
         if (!call.has_acknowledge()) {
           drop(call, "Expecting 'acknowledge' to be present");
@@ -345,6 +354,7 @@ protected:
     install<RescindResourceOfferMessage>(&MesosProcess::receive);
     install<StatusUpdateMessage>(&MesosProcess::receive);
     install<LostSlaveMessage>(&MesosProcess::receive);
+    install<ExitedExecutorMessage>(&MesosProcess::receive);
     install<ExecutorToFrameworkMessage>(&MesosProcess::receive);
     install<FrameworkErrorMessage>(&MesosProcess::receive);
 
@@ -657,6 +667,20 @@ protected:
     receive(from, event);
   }
 
+  void receive(const UPID& from, const ExitedExecutorMessage& message)
+  {
+    Event event;
+    event.set_type(Event::FAILURE);
+
+    Event::Failure* failure = event.mutable_failure();
+
+    failure->mutable_slave_id()->CopyFrom(message.slave_id());
+    failure->mutable_executor_id()->CopyFrom(message.executor_id());
+    failure->set_status(message.status());
+
+    receive(from, event);
+  }
+
   void receive(const UPID& from, const ExecutorToFrameworkMessage& _message)
   {
     Event event;

http://git-wip-us.apache.org/repos/asf/mesos/blob/2d447e76/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index f68a005..e531283 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -382,6 +382,11 @@ void Slave::initialize()
       &KillTaskMessage::framework_id,
       &KillTaskMessage::task_id);
 
+  install<ShutdownExecutorMessage>(
+      &Slave::shutdownExecutor,
+      &ShutdownExecutorMessage::framework_id,
+      &ShutdownExecutorMessage::executor_id);
+
   install<ShutdownFrameworkMessage>(
       &Slave::shutdownFramework,
       &ShutdownFrameworkMessage::framework_id);
@@ -1792,7 +1797,7 @@ void Slave::shutdownFramework(
 
         if (executor->state == Executor::REGISTERING ||
             executor->state == Executor::RUNNING) {
-          shutdownExecutor(framework, executor);
+          _shutdownExecutor(framework, executor);
         } else if (executor->state == Executor::TERMINATED) {
           // NOTE: We call remove here to ensure we can remove an
           // executor (of a terminating framework) that is terminated
@@ -2578,7 +2583,7 @@ void Slave::statusUpdate(StatusUpdate update, const UPID& pid)
                << " of framework " << update.framework_id()
                << " which is not allowed. Shutting down the executor";
 
-    shutdownExecutor(framework, executor);
+    _shutdownExecutor(framework, executor);
 
     return;
   }
@@ -3273,6 +3278,7 @@ void Slave::executorTerminated(
       // Only send ExitedExecutorMessage if it is not a Command
       // Executor because the master doesn't store them; they are
       // generated by the slave.
+      // TODO(vinod): Reliably forward this message to the master.
       if (!executor->isCommandExecutor()) {
         ExitedExecutorMessage message;
         message.mutable_slave_id()->MergeFrom(info.id());
@@ -3455,7 +3461,76 @@ void _unmonitor(
 }
 
 
-void Slave::shutdownExecutor(Framework* framework, Executor* executor)
+void Slave::shutdownExecutor(
+    const UPID& from,
+    const FrameworkID& frameworkId,
+    const ExecutorID& executorId)
+{
+  if (from && master != from) {
+    LOG(WARNING) << "Ignoring shutdown executor message for executor '"
+                 << executorId << "' of framework " << frameworkId
+                 << " from " << from << " because it is not from the"
+                 << " registered master ("
+                 << (master.isSome() ? stringify(master.get()) : "None") << ")";
+    return;
+  }
+
+  LOG(INFO) << "Asked to shut down executor '" << executorId
+            << "' of framework "<< frameworkId << " by " << from;
+
+  CHECK(state == RECOVERING || state == DISCONNECTED ||
+        state == RUNNING || state == TERMINATING)
+    << state;
+
+  if (state == RECOVERING || state == DISCONNECTED) {
+    LOG(WARNING) << "Ignoring shutdown executor message for executor '"
+                 << executorId << "' of framework " << frameworkId
+                 << " because the slave has not yet registered with the master";
+    return;
+  }
+
+  Framework* framework = getFramework(frameworkId);
+  if (framework == NULL) {
+    LOG(WARNING) << "Cannot shut down executor '" << executorId
+                 << "' of unknown framework " << frameworkId;
+    return;
+  }
+
+  CHECK(framework->state == Framework::RUNNING ||
+        framework->state == Framework::TERMINATING)
+    << framework->state;
+
+  if (framework->state == Framework::TERMINATING) {
+    LOG(WARNING) << "Ignoring shutdown executor '" << executorId
+                 << "' of framework " << frameworkId
+                 << " because the framework is terminating";
+    return;
+  }
+
+  if (!framework->executors.contains(executorId)) {
+    LOG(WARNING) << "Ignoring shutdown of unknown executor '" << executorId
+                 << "' of framework " << frameworkId;
+  }
+
+  Executor* executor = framework->executors[executorId];
+  CHECK(executor->state == Executor::REGISTERING ||
+        executor->state == Executor::RUNNING ||
+        executor->state == Executor::TERMINATING ||
+        executor->state == Executor::TERMINATED)
+    << executor->state;
+
+  if (executor->state == Executor::TERMINATING ||
+      executor->state == Executor::TERMINATED) {
+    LOG(WARNING) << "Ignoring shutdown executor '" << executorId
+                 << "' of framework " << frameworkId
+                 << " because the executor is terminating/terminated";
+  }
+
+  _shutdownExecutor(framework, executor);
+}
+
+
+void Slave::_shutdownExecutor(Framework* framework, Executor* executor)
 {
   CHECK_NOTNULL(framework);
   CHECK_NOTNULL(executor);
@@ -3467,7 +3542,6 @@ void Slave::shutdownExecutor(Framework* framework, Executor* executor)
         framework->state == Framework::TERMINATING)
     << framework->state;
 
-
   CHECK(executor->state == Executor::REGISTERING ||
         executor->state == Executor::RUNNING)
     << executor->state;
@@ -3774,7 +3848,7 @@ Future<Nothing> Slave::_recover()
                     << "' of framework " << framework->id()
                     << " to " << executor->pid;
 
-          shutdownExecutor(framework, executor);
+          _shutdownExecutor(framework, executor);
         } else {
           LOG(INFO) << "Killing executor '" << executor->id
                     << "' of framework " << framework->id()

http://git-wip-us.apache.org/repos/asf/mesos/blob/2d447e76/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index d214ddb..1b8c512 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -125,6 +125,11 @@ public:
       const FrameworkID& frameworkId,
       const TaskID& taskId);
 
+  void shutdownExecutor(
+      const process::UPID& from,
+      const FrameworkID& frameworkId,
+      const ExecutorID& executorId);
+
   void shutdownFramework(
       const process::UPID& from,
       const FrameworkID& frameworkId);
@@ -240,13 +245,6 @@ public:
   // and os calls.
   void _checkDiskUsage(const process::Future<double>& usage);
 
-  // Shut down an executor. This is a two phase process. First, an
-  // executor receives a shut down message (shut down phase), then
-  // after a configurable timeout the slave actually forces a kill
-  // (kill phase, via the isolator) if the executor has not
-  // exited.
-  void shutdownExecutor(Framework* framework, Executor* executor);
-
   // Invoked whenever the detector detects a change in masters.
   // Made public for testing purposes.
   void detected(const process::Future<Option<MasterInfo> >& pid);
@@ -299,13 +297,6 @@ public:
       const FrameworkID& frameworkId,
       const TaskInfo& task);
 
-  // Handle the second phase of shutting down an executor for those
-  // executors that have not properly shutdown within a timeout.
-  void shutdownExecutorTimeout(
-      const FrameworkID& frameworkId,
-      const ExecutorID& executorId,
-      const ContainerID& containerId);
-
   // Shuts down the executor if it did not register yet.
   void registerExecutorTimeout(
       const FrameworkID& frameworkId,
@@ -360,6 +351,20 @@ private:
   void _authenticate();
   void authenticationTimeout(process::Future<bool> future);
 
+  // Shut down an executor. This is a two phase process. First, an
+  // executor receives a shut down message (shut down phase), then
+  // after a configurable timeout the slave actually forces a kill
+  // (kill phase, via the isolator) if the executor has not
+  // exited.
+  void _shutdownExecutor(Framework* framework, Executor* executor);
+
+  // Handle the second phase of shutting down an executor for those
+  // executors that have not properly shutdown within a timeout.
+  void shutdownExecutorTimeout(
+      const FrameworkID& frameworkId,
+      const ExecutorID& executorId,
+      const ContainerID& containerId);
+
   // Inner class used to namespace HTTP route handlers (see
   // slave/http.cpp for implementations).
   class Http

http://git-wip-us.apache.org/repos/asf/mesos/blob/2d447e76/src/tests/scheduler_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/scheduler_tests.cpp b/src/tests/scheduler_tests.cpp
index a1e49af..a59b146 100644
--- a/src/tests/scheduler_tests.cpp
+++ b/src/tests/scheduler_tests.cpp
@@ -24,6 +24,7 @@
 
 #include <mesos/executor.hpp>
 #include <mesos/scheduler.hpp>
+#include <mesos/type_utils.hpp>
 
 #include <process/clock.hpp>
 #include <process/future.hpp>
@@ -108,14 +109,14 @@ ACTION_P(Enqueue, queue)
 
 TEST_F(SchedulerTest, TaskRunning)
 {
-  Try<PID<Master> > master = StartMaster();
+  Try<PID<Master>> master = StartMaster();
   ASSERT_SOME(master);
 
   MockExecutor exec(DEFAULT_EXECUTOR_ID);
 
   TestContainerizer containerizer(&exec);
 
-  Try<PID<Slave> > slave = StartSlave(&containerizer);
+  Try<PID<Slave>> slave = StartSlave(&containerizer);
   ASSERT_SOME(slave);
 
   Callbacks callbacks;
@@ -213,14 +214,14 @@ TEST_F(SchedulerTest, TaskRunning)
 
 TEST_F(SchedulerTest, ReconcileTask)
 {
-  Try<PID<Master> > master = StartMaster();
+  Try<PID<Master>> master = StartMaster();
   ASSERT_SOME(master);
 
   MockExecutor exec(DEFAULT_EXECUTOR_ID);
 
   TestContainerizer containerizer(&exec);
 
-  Try<PID<Slave> > slave = StartSlave(&containerizer);
+  Try<PID<Slave>> slave = StartSlave(&containerizer);
   ASSERT_SOME(slave);
 
   Callbacks callbacks;
@@ -442,6 +443,118 @@ TEST_F(SchedulerTest, KillTask)
 }
 
 
+TEST_F(SchedulerTest, ShutdownExecutor)
+{
+  Try<PID<Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+  TestContainerizer containerizer(&exec);
+
+  Try<PID<Slave>> slave = StartSlave(&containerizer);
+  ASSERT_SOME(slave);
+
+  Callbacks callbacks;
+
+  Future<Nothing> connected;
+  EXPECT_CALL(callbacks, connected())
+    .WillOnce(FutureSatisfy(&connected));
+
+  scheduler::Mesos mesos(
+      master.get(),
+      DEFAULT_CREDENTIAL,
+      lambda::bind(&Callbacks::connected, lambda::ref(callbacks)),
+      lambda::bind(&Callbacks::disconnected, lambda::ref(callbacks)),
+      lambda::bind(&Callbacks::received, lambda::ref(callbacks), lambda::_1));
+
+  AWAIT_READY(connected);
+
+  Queue<Event> events;
+
+  EXPECT_CALL(callbacks, received(_))
+    .WillRepeatedly(Enqueue(&events));
+
+  {
+    Call call;
+    call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
+    call.set_type(Call::REGISTER);
+
+    mesos.send(call);
+  }
+
+  Future<Event> event = events.get();
+  AWAIT_READY(event);
+  EXPECT_EQ(Event::REGISTERED, event.get().type());
+
+  FrameworkID id(event.get().registered().framework_id());
+
+  event = events.get();
+  AWAIT_READY(event);
+  EXPECT_EQ(Event::OFFERS, event.get().type());
+  EXPECT_NE(0, event.get().offers().offers().size());
+
+  EXPECT_CALL(exec, registered(_, _, _, _))
+    .Times(1);
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_FINISHED));
+
+  Offer offer = event.get().offers().offers(0);
+  TaskInfo taskInfo = createTask(offer,"", DEFAULT_EXECUTOR_ID);
+
+  {
+    Call call;
+    call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
+    call.mutable_framework_info()->mutable_id()->CopyFrom(id);
+    call.set_type(Call::ACCEPT);
+
+    Call::Accept* accept = call.mutable_accept();
+    accept->add_offer_ids()->CopyFrom(offer.id());
+
+    Offer::Operation* operation = accept->add_operations();
+    operation->set_type(Offer::Operation::LAUNCH);
+    operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
+
+    mesos.send(call);
+  }
+
+  event = events.get();
+  AWAIT_READY(event);
+  EXPECT_EQ(Event::UPDATE, event.get().type());
+  EXPECT_EQ(TASK_FINISHED, event.get().update().status().state());
+
+  Future<Nothing> shutdown;
+  EXPECT_CALL(exec, shutdown(_))
+    .WillOnce(FutureSatisfy(&shutdown));
+
+  {
+    Call call;
+    call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
+    call.mutable_framework_info()->mutable_id()->CopyFrom(id);
+    call.set_type(Call::SHUTDOWN);
+
+    Call::Shutdown* shutdown = call.mutable_shutdown();
+    shutdown->mutable_executor_id()->CopyFrom(DEFAULT_EXECUTOR_ID);
+    shutdown->mutable_slave_id()->CopyFrom(offer.slave_id());
+
+    mesos.send(call);
+  }
+
+  AWAIT_READY(shutdown);
+  containerizer.destroy(id, DEFAULT_EXECUTOR_ID);
+
+  // Executor termination results in a 'FAILURE' event.
+  event = events.get();
+  AWAIT_READY(event);
+  EXPECT_EQ(Event::FAILURE, event.get().type());
+  ExecutorID executorId(DEFAULT_EXECUTOR_ID);
+  EXPECT_EQ(executorId, event.get().failure().executor_id());
+
+  Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
+}
+
+
 // TODO(benh): Write test for sending Call::Acknowledgement through
 // master to slave when Event::Update was generated locally.
 
@@ -451,7 +564,7 @@ class MesosSchedulerDriverTest : public MesosTest {};
 
 TEST_F(MesosSchedulerDriverTest, MetricsEndpoint)
 {
-  Try<PID<Master> > master = StartMaster();
+  Try<PID<Master>> master = StartMaster();
   ASSERT_SOME(master);
 
   MockScheduler sched;
@@ -503,12 +616,12 @@ ACTION(StopAndAbort)
 // abort(), no pending acknowledgements are sent.
 TEST_F(MesosSchedulerDriverTest, DropAckIfStopCalledBeforeAbort)
 {
-  Try<PID<Master> > master = StartMaster();
+  Try<PID<Master>> master = StartMaster();
   ASSERT_SOME(master);
 
   MockExecutor exec(DEFAULT_EXECUTOR_ID);
   TestContainerizer containerizer(&exec);
-  Try<PID<Slave> > slave = StartSlave(&containerizer);
+  Try<PID<Slave>> slave = StartSlave(&containerizer);
   ASSERT_SOME(slave);
 
   MockScheduler sched;
@@ -562,12 +675,12 @@ TEST_F(MesosSchedulerDriverTest, DropAckIfStopCalledBeforeAbort)
 // the call to 'acknowledgeStatusUpdate' sends the ack to the master.
 TEST_F(MesosSchedulerDriverTest, ExplicitAcknowledgements)
 {
-  Try<PID<Master> > master = StartMaster();
+  Try<PID<Master>> master = StartMaster();
   ASSERT_SOME(master);
 
   MockExecutor exec(DEFAULT_EXECUTOR_ID);
   TestContainerizer containerizer(&exec);
-  Try<PID<Slave> > slave = StartSlave(&containerizer);
+  Try<PID<Slave>> slave = StartSlave(&containerizer);
   ASSERT_SOME(slave);
 
   MockScheduler sched;
@@ -628,10 +741,10 @@ TEST_F(MesosSchedulerDriverTest, ExplicitAcknowledgements)
 // resources.
 TEST_F(MesosSchedulerDriverTest, ExplicitAcknowledgementsMasterGeneratedUpdate)
 {
-  Try<PID<Master> > master = StartMaster();
+  Try<PID<Master>> master = StartMaster();
   ASSERT_SOME(master);
 
-  Try<PID<Slave> > slave = StartSlave();
+  Try<PID<Slave>> slave = StartSlave();
   ASSERT_SOME(slave);
 
   MockScheduler sched;
@@ -696,7 +809,7 @@ TEST_F(MesosSchedulerDriverTest, ExplicitAcknowledgementsMasterGeneratedUpdate)
 // generate a status with no slave id by performing reconciliation.
 TEST_F(MesosSchedulerDriverTest, ExplicitAcknowledgementsUnsetSlaveID)
 {
-  Try<PID<Master> > master = StartMaster();
+  Try<PID<Master>> master = StartMaster();
   ASSERT_SOME(master);
 
   MockScheduler sched;


[05/11] mesos git commit: Removed MasterInfo from REGISTER and REREGISTER scheduler calls.

Posted by vi...@apache.org.
Removed MasterInfo from REGISTER and REREGISTER scheduler calls.

Review: https://reviews.apache.org/r/32504


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d3c32697
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d3c32697
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d3c32697

Branch: refs/heads/master
Commit: d3c3269731b60b9b1232a3aa9d3b0d61742413d3
Parents: f95fa11
Author: Vinod Kone <vi...@gmail.com>
Authored: Fri Mar 20 17:14:28 2015 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Sat Apr 25 11:32:47 2015 -1000

----------------------------------------------------------------------
 include/mesos/scheduler/scheduler.proto         | 2 --
 src/examples/low_level_scheduler_libprocess.cpp | 6 ++----
 src/examples/low_level_scheduler_pthread.cpp    | 6 ++----
 src/scheduler/scheduler.cpp                     | 2 --
 4 files changed, 4 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/d3c32697/include/mesos/scheduler/scheduler.proto
----------------------------------------------------------------------
diff --git a/include/mesos/scheduler/scheduler.proto b/include/mesos/scheduler/scheduler.proto
index 51bfe8d..f347912 100644
--- a/include/mesos/scheduler/scheduler.proto
+++ b/include/mesos/scheduler/scheduler.proto
@@ -46,12 +46,10 @@ message Event {
 
   message Registered {
     required FrameworkID framework_id = 1;
-    required MasterInfo master_info = 2;
   }
 
   message Reregistered {
     required FrameworkID framework_id = 1;
-    required MasterInfo master_info = 2;
   }
 
   message Offers {

http://git-wip-us.apache.org/repos/asf/mesos/blob/d3c32697/src/examples/low_level_scheduler_libprocess.cpp
----------------------------------------------------------------------
diff --git a/src/examples/low_level_scheduler_libprocess.cpp b/src/examples/low_level_scheduler_libprocess.cpp
index 2a388fe..fbfb0f7 100644
--- a/src/examples/low_level_scheduler_libprocess.cpp
+++ b/src/examples/low_level_scheduler_libprocess.cpp
@@ -124,8 +124,7 @@ public:
           state = REGISTERED;
 
           cout << "Framework '" << event.registered().framework_id().value()
-               << "' registered with Master '"
-               << event.registered().master_info().id() << "'" << endl;
+               << "' registered" << endl;
           break;
         }
 
@@ -135,8 +134,7 @@ public:
           state = REGISTERED;
 
           cout << "Framework '" << event.reregistered().framework_id().value()
-               << "' re-registered to Master '"
-               << event.reregistered().master_info().id() << "'" << endl;
+               << "' re-registered" << endl;
           break;
         }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/d3c32697/src/examples/low_level_scheduler_pthread.cpp
----------------------------------------------------------------------
diff --git a/src/examples/low_level_scheduler_pthread.cpp b/src/examples/low_level_scheduler_pthread.cpp
index 063b7f6..944573d 100644
--- a/src/examples/low_level_scheduler_pthread.cpp
+++ b/src/examples/low_level_scheduler_pthread.cpp
@@ -145,8 +145,7 @@ public:
           framework.mutable_id()->CopyFrom(event.registered().framework_id());
 
           cout << "Framework '" << event.registered().framework_id().value()
-               << "' registered with Master '"
-               << event.registered().master_info().id() << "'" << endl;
+               << "' registered" << endl;
           break;
         }
 
@@ -158,8 +157,7 @@ public:
           pthread_mutex_unlock(&mutex);
 
           cout << "Framework '" << event.reregistered().framework_id().value()
-               << "' re-registered with Master '"
-               << event.reregistered().master_info().id() << "'" << endl;
+               << "' re-registered" << endl;
           break;
         }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/d3c32697/src/scheduler/scheduler.cpp
----------------------------------------------------------------------
diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp
index d417442..8f0f374 100644
--- a/src/scheduler/scheduler.cpp
+++ b/src/scheduler/scheduler.cpp
@@ -576,7 +576,6 @@ protected:
     Event::Registered* registered = event.mutable_registered();
 
     registered->mutable_framework_id()->CopyFrom(message.framework_id());
-    registered->mutable_master_info()->CopyFrom(message.master_info());
 
     receive(from, event);
   }
@@ -594,7 +593,6 @@ protected:
     Event::Reregistered* reregistered = event.mutable_reregistered();
 
     reregistered->mutable_framework_id()->CopyFrom(message.framework_id());
-    reregistered->mutable_master_info()->CopyFrom(message.master_info());
 
     receive(from, event);
   }


[08/11] mesos git commit: Renamed UNREGISTER call to TEARDOWN.

Posted by vi...@apache.org.
Renamed UNREGISTER call to TEARDOWN.

Review: https://reviews.apache.org/r/32845


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c3de1e8e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c3de1e8e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c3de1e8e

Branch: refs/heads/master
Commit: c3de1e8ec9cf0f4c228eaca71c050e4735712c08
Parents: eb3c958
Author: Vinod Kone <vi...@gmail.com>
Authored: Fri Apr 3 16:19:14 2015 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Sat Apr 25 11:32:48 2015 -1000

----------------------------------------------------------------------
 include/mesos/scheduler/scheduler.proto         |   4 +-
 src/examples/low_level_scheduler_libprocess.cpp |   2 +-
 src/examples/low_level_scheduler_pthread.cpp    |   2 +-
 src/master/master.cpp                           |   5 +-
 src/scheduler/scheduler.cpp                     |   6 +-
 src/tests/scheduler_tests.cpp                   | 100 +++++++++++++++++++
 6 files changed, 110 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/c3de1e8e/include/mesos/scheduler/scheduler.proto
----------------------------------------------------------------------
diff --git a/include/mesos/scheduler/scheduler.proto b/include/mesos/scheduler/scheduler.proto
index aea5607..ec9adf6 100644
--- a/include/mesos/scheduler/scheduler.proto
+++ b/include/mesos/scheduler/scheduler.proto
@@ -110,7 +110,7 @@ message Call {
   // applicable.
   enum Type {
     SUBSCRIBE = 1;    // See 'framework_info' below.
-    UNREGISTER = 3;
+    TEARDOWN = 3;     // Shuts down all tasks and executors.
     REVIVE = 6;
     DECLINE = 5;
     ACCEPT = 12;
@@ -208,7 +208,7 @@ message Call {
 
   // Identifies who generated this call. Always necessary, but the
   // only thing that needs to be set for certain calls, e.g.,
-  // SUBSCRIBE and UNREGISTER. 'framework_info.id()' must be always
+  // SUBSCRIBE and TEARDOWN. 'framework_info.id()' must be always
   // set except when a brand new scheduler SUBSCRIBEs for the very
   // first time.
   required FrameworkInfo framework_info = 1;

http://git-wip-us.apache.org/repos/asf/mesos/blob/c3de1e8e/src/examples/low_level_scheduler_libprocess.cpp
----------------------------------------------------------------------
diff --git a/src/examples/low_level_scheduler_libprocess.cpp b/src/examples/low_level_scheduler_libprocess.cpp
index cf85c76..b55ad60 100644
--- a/src/examples/low_level_scheduler_libprocess.cpp
+++ b/src/examples/low_level_scheduler_libprocess.cpp
@@ -311,7 +311,7 @@ private:
   {
     Call call;
     call.mutable_framework_info()->CopyFrom(framework);
-    call.set_type(Call::UNREGISTER);
+    call.set_type(Call::TEARDOWN);
 
     mesos.send(call);
   }

http://git-wip-us.apache.org/repos/asf/mesos/blob/c3de1e8e/src/examples/low_level_scheduler_pthread.cpp
----------------------------------------------------------------------
diff --git a/src/examples/low_level_scheduler_pthread.cpp b/src/examples/low_level_scheduler_pthread.cpp
index 4af576d..64a0e44 100644
--- a/src/examples/low_level_scheduler_pthread.cpp
+++ b/src/examples/low_level_scheduler_pthread.cpp
@@ -358,7 +358,7 @@ private:
   void finalize()
   {
     Call call;
-    call.set_type(Call::UNREGISTER);
+    call.set_type(Call::TEARDOWN);
     call.mutable_framework_info()->CopyFrom(framework);
 
     mesos.send(call);

http://git-wip-us.apache.org/repos/asf/mesos/blob/c3de1e8e/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 502d3ba..ce9d263 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1588,7 +1588,6 @@ void Master::receive(
   // framework id is set and non-empty except for SUBSCRIBE call.
 
   switch (call.type()) {
-    case scheduler::Call::UNREGISTER:
     case scheduler::Call::REVIVE:
     case scheduler::Call::DECLINE:
       drop(from, call, "Unimplemented");
@@ -1629,6 +1628,10 @@ void Master::receive(
       drop(from, call, "Unimplemented");
       break;
 
+    case scheduler::Call::TEARDOWN:
+      removeFramework(framework);
+      break;
+
     default:
       drop(from, call, "Unknown call type");
       break;

http://git-wip-us.apache.org/repos/asf/mesos/blob/c3de1e8e/src/scheduler/scheduler.cpp
----------------------------------------------------------------------
diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp
index 82cbfcb..2047ee4 100644
--- a/src/scheduler/scheduler.cpp
+++ b/src/scheduler/scheduler.cpp
@@ -229,10 +229,8 @@ public:
         break;
       }
 
-      case Call::UNREGISTER: {
-        UnregisterFrameworkMessage message;
-        message.mutable_framework_id()->CopyFrom(call.framework_info().id());
-        send(master.get(), message);
+      case Call::TEARDOWN: {
+        send(master.get(), call);
         break;
       }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/c3de1e8e/src/tests/scheduler_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/scheduler_tests.cpp b/src/tests/scheduler_tests.cpp
index ddbb712..54d6bc9 100644
--- a/src/tests/scheduler_tests.cpp
+++ b/src/tests/scheduler_tests.cpp
@@ -555,6 +555,106 @@ TEST_F(SchedulerTest, ShutdownExecutor)
 }
 
 
+TEST_F(SchedulerTest, Teardown)
+{
+  Try<PID<Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+  TestContainerizer containerizer(&exec);
+
+  Try<PID<Slave>> slave = StartSlave(&containerizer);
+  ASSERT_SOME(slave);
+
+  Callbacks callbacks;
+
+  Future<Nothing> connected;
+  EXPECT_CALL(callbacks, connected())
+    .WillOnce(FutureSatisfy(&connected));
+
+  scheduler::Mesos mesos(
+      master.get(),
+      DEFAULT_CREDENTIAL,
+      lambda::bind(&Callbacks::connected, lambda::ref(callbacks)),
+      lambda::bind(&Callbacks::disconnected, lambda::ref(callbacks)),
+      lambda::bind(&Callbacks::received, lambda::ref(callbacks), lambda::_1));
+
+  AWAIT_READY(connected);
+
+  Queue<Event> events;
+
+  EXPECT_CALL(callbacks, received(_))
+    .WillRepeatedly(Enqueue(&events));
+
+  {
+    Call call;
+    call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
+    call.set_type(Call::SUBSCRIBE);
+
+    mesos.send(call);
+  }
+
+  Future<Event> event = events.get();
+  AWAIT_READY(event);
+  EXPECT_EQ(Event::SUBSCRIBED, event.get().type());
+
+  FrameworkID id(event.get().subscribed().framework_id());
+
+  event = events.get();
+  AWAIT_READY(event);
+  EXPECT_EQ(Event::OFFERS, event.get().type());
+  EXPECT_NE(0, event.get().offers().offers().size());
+
+  EXPECT_CALL(exec, registered(_, _, _, _))
+    .Times(1);
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  Offer offer = event.get().offers().offers(0);
+  TaskInfo taskInfo = createTask(offer,"", DEFAULT_EXECUTOR_ID);
+
+  {
+    Call call;
+    call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
+    call.mutable_framework_info()->mutable_id()->CopyFrom(id);
+    call.set_type(Call::ACCEPT);
+
+    Call::Accept* accept = call.mutable_accept();
+    accept->add_offer_ids()->CopyFrom(offer.id());
+
+    Offer::Operation* operation = accept->add_operations();
+    operation->set_type(Offer::Operation::LAUNCH);
+    operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
+
+    mesos.send(call);
+  }
+
+  event = events.get();
+  AWAIT_READY(event);
+  EXPECT_EQ(Event::UPDATE, event.get().type());
+  EXPECT_EQ(TASK_RUNNING, event.get().update().status().state());
+
+  Future<Nothing> shutdown;
+  EXPECT_CALL(exec, shutdown(_))
+    .WillOnce(FutureSatisfy(&shutdown));
+
+  {
+    Call call;
+    call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
+    call.mutable_framework_info()->mutable_id()->CopyFrom(id);
+    call.set_type(Call::TEARDOWN);
+
+    mesos.send(call);
+  }
+
+  AWAIT_READY(shutdown);
+
+  Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
+}
+
+
 // TODO(benh): Write test for sending Call::Acknowledgement through
 // master to slave when Event::Update was generated locally.
 


[09/11] mesos git commit: Added output stream operators for scheduler Calls and Events.

Posted by vi...@apache.org.
Added output stream operators for scheduler Calls and Events.

Review: https://reviews.apache.org/r/32506


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/94cb038f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/94cb038f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/94cb038f

Branch: refs/heads/master
Commit: 94cb038f13d4793812fd0ae0eeb9c68ae2e4d152
Parents: 2d447e7
Author: Vinod Kone <vi...@gmail.com>
Authored: Wed Mar 25 11:36:52 2015 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Sat Apr 25 11:32:48 2015 -1000

----------------------------------------------------------------------
 include/mesos/type_utils.hpp | 22 +++++++++++++++++++++-
 src/master/master.cpp        |  8 ++++----
 2 files changed, 25 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/94cb038f/include/mesos/type_utils.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/type_utils.hpp b/include/mesos/type_utils.hpp
index cdf5864..0446374 100644
--- a/include/mesos/type_utils.hpp
+++ b/include/mesos/type_utils.hpp
@@ -19,12 +19,16 @@
 #ifndef __MESOS_TYPE_UTILS_H__
 #define __MESOS_TYPE_UTILS_H__
 
+#include <ostream>
+
 #include <boost/functional/hash.hpp>
 
 #include <mesos/mesos.hpp>
 
 #include <mesos/module/module.hpp>
 
+#include <mesos/scheduler/scheduler.hpp>
+
 #include <stout/uuid.hpp>
 
 // This file includes definitions for operators on public protobuf
@@ -338,7 +342,23 @@ inline std::ostream& operator << (
     std::ostream& stream,
     const TaskState& state)
 {
-  return stream << TaskState_descriptor()->FindValueByNumber(state)->name();
+  return stream << TaskState_Name(state);
+}
+
+
+inline std::ostream& operator << (
+    std::ostream& stream,
+    const scheduler::Call::Type& type)
+{
+  return stream << scheduler::Call_Type_Name(type);
+}
+
+
+inline std::ostream& operator << (
+    std::ostream& stream,
+    const scheduler::Event::Type& type)
+{
+  return stream << scheduler::Event_Type_Name(type);
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/94cb038f/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index d443c80..2c161a9 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1526,10 +1526,10 @@ void Master::drop(
 {
   // TODO(bmahler): Increment a metric.
 
-  LOG(ERROR) << "Dropping " << scheduler::Call::Type_Name(call.type())
-             << " call from framework " << call.framework_info().id()
-             << " (" << call.framework_info().name() << ") at " << from
-             << ": " << message;
+  LOG(ERROR) << "Dropping " << call.type() << " call"
+             << " from framework " << call.framework_info().id()
+             << " (" << call.framework_info().name()
+             << ") at " << from << ": " << message;
 }
 
 


[02/11] mesos git commit: Removed LAUNCH call from scheduler.proto.

Posted by vi...@apache.org.
Removed LAUNCH call from scheduler.proto.

Review: https://reviews.apache.org/r/32500


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/79086eb1
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/79086eb1
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/79086eb1

Branch: refs/heads/master
Commit: 79086eb1e5c470d4b8e425ab5e26061fc84df14b
Parents: ec0a9f3
Author: Vinod Kone <vi...@gmail.com>
Authored: Fri Mar 20 15:57:42 2015 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Sat Apr 25 11:32:46 2015 -1000

----------------------------------------------------------------------
 include/mesos/scheduler/scheduler.proto         | 14 ++----
 src/examples/low_level_scheduler_libprocess.cpp | 11 +++--
 src/examples/low_level_scheduler_pthread.cpp    | 11 +++--
 src/master/master.cpp                           |  3 +-
 src/scheduler/scheduler.cpp                     | 50 +-------------------
 src/tests/scheduler_tests.cpp                   | 12 +++--
 6 files changed, 29 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/79086eb1/include/mesos/scheduler/scheduler.proto
----------------------------------------------------------------------
diff --git a/include/mesos/scheduler/scheduler.proto b/include/mesos/scheduler/scheduler.proto
index 783a63a..ce401aa 100644
--- a/include/mesos/scheduler/scheduler.proto
+++ b/include/mesos/scheduler/scheduler.proto
@@ -121,7 +121,6 @@ message Call {
     REVIVE = 6;
     DECLINE = 5;
     ACCEPT = 12;
-    LAUNCH = 7;
     KILL = 8;
     ACKNOWLEDGE = 9;
     RECONCILE = 10;
@@ -162,20 +161,16 @@ message Call {
   //     ]
   //   }
   //
-  // TODO(bmahler): Not implemented.
+  // Note that any of the offer’s resources not used in the 'Accept'
+  // call (e.g., to launch a task) are considered unused and might be
+  // reoffered to other frameworks. In other words, the same OfferID
+  // cannot be used in more than one 'Accept' call.
   message Accept {
     repeated OfferID offer_ids = 1;
     repeated Offer.Operation operations = 2;
     optional Filters filters = 3;
   }
 
-  // TODO(bmahler): Deprecate Launch in favor of Accept.
-  message Launch {
-    repeated TaskInfo task_infos = 1;
-    repeated OfferID offer_ids = 2;
-    optional Filters filters = 3;
-  }
-
   message Kill {
     required TaskID task_id = 1;
   }
@@ -216,7 +211,6 @@ message Call {
   optional Request request = 3;
   optional Decline decline = 4;
   optional Accept accept = 10;
-  optional Launch launch = 5;
   optional Kill kill = 6;
   optional Acknowledge acknowledge = 7;
   optional Reconcile reconcile = 8;

http://git-wip-us.apache.org/repos/asf/mesos/blob/79086eb1/src/examples/low_level_scheduler_libprocess.cpp
----------------------------------------------------------------------
diff --git a/src/examples/low_level_scheduler_libprocess.cpp b/src/examples/low_level_scheduler_libprocess.cpp
index 63d34ee..2a388fe 100644
--- a/src/examples/low_level_scheduler_libprocess.cpp
+++ b/src/examples/low_level_scheduler_libprocess.cpp
@@ -249,13 +249,16 @@ private:
 
       Call call;
       call.mutable_framework_info()->CopyFrom(framework);
-      call.set_type(Call::LAUNCH);
+      call.set_type(Call::ACCEPT);
 
-      Call::Launch* launch = call.mutable_launch();
+      Call::Accept* accept = call.mutable_accept();
+      accept->add_offer_ids()->CopyFrom(offer.id());
+
+      Offer::Operation* operation = accept->add_operations();
+      operation->set_type(Offer::Operation::LAUNCH);
       foreach (const TaskInfo& taskInfo, tasks) {
-        launch->add_task_infos()->CopyFrom(taskInfo);
+        operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
       }
-      launch->add_offer_ids()->CopyFrom(offer.id());
 
       mesos.send(call);
     }

http://git-wip-us.apache.org/repos/asf/mesos/blob/79086eb1/src/examples/low_level_scheduler_pthread.cpp
----------------------------------------------------------------------
diff --git a/src/examples/low_level_scheduler_pthread.cpp b/src/examples/low_level_scheduler_pthread.cpp
index 6d1f938..063b7f6 100644
--- a/src/examples/low_level_scheduler_pthread.cpp
+++ b/src/examples/low_level_scheduler_pthread.cpp
@@ -298,14 +298,17 @@ private:
       }
 
       Call call;
-      call.set_type(Call::LAUNCH);
       call.mutable_framework_info()->CopyFrom(framework);
+      call.set_type(Call::ACCEPT);
 
-      Call::Launch* launch = call.mutable_launch();
+      Call::Accept* accept = call.mutable_accept();
+      accept->add_offer_ids()->CopyFrom(offer.id());
+
+      Offer::Operation* operation = accept->add_operations();
+      operation->set_type(Offer::Operation::LAUNCH);
       foreach (const TaskInfo& taskInfo, tasks) {
-        launch->add_task_infos()->CopyFrom(taskInfo);
+        operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
       }
-      launch->add_offer_ids()->CopyFrom(offer.id());
 
       mesos.send(call);
     }

http://git-wip-us.apache.org/repos/asf/mesos/blob/79086eb1/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index f3462d1..865ff89 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1554,6 +1554,8 @@ void Master::receive(
     const UPID& from,
     const scheduler::Call& call)
 {
+  // TODO(vinod): Add metrics for calls.
+
   const FrameworkInfo& frameworkInfo = call.framework_info();
 
   // For REGISTER and REREGISTER calls, no need to look up the
@@ -1602,7 +1604,6 @@ void Master::receive(
       accept(framework, call.accept());
       break;
 
-    case scheduler::Call::LAUNCH:
     case scheduler::Call::KILL:
     case scheduler::Call::ACKNOWLEDGE:
     case scheduler::Call::RECONCILE:

http://git-wip-us.apache.org/repos/asf/mesos/blob/79086eb1/src/scheduler/scheduler.cpp
----------------------------------------------------------------------
diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp
index bd9fced..6fbd991 100644
--- a/src/scheduler/scheduler.cpp
+++ b/src/scheduler/scheduler.cpp
@@ -295,33 +295,6 @@ public:
         break;
       }
 
-      case Call::LAUNCH: {
-        if (!call.has_launch()) {
-          drop(call, "Expecting 'launch' to be present");
-          return;
-        }
-        // We do some local validation here, but really this should
-        // all happen in the master so it's only implemented once.
-        foreach (TaskInfo& task,
-                 *call.mutable_launch()->mutable_task_infos()) {
-          // Set ExecutorInfo::framework_id if missing since this
-          // field was added to the API later and thus was made
-          // optional.
-          if (task.has_executor() && !task.executor().has_framework_id()) {
-            task.mutable_executor()->mutable_framework_id()->CopyFrom(
-                call.framework_info().id());
-          }
-        }
-
-        LaunchTasksMessage message;
-        message.mutable_framework_id()->CopyFrom(call.framework_info().id());
-        message.mutable_filters()->CopyFrom(call.launch().filters());
-        message.mutable_offer_ids()->CopyFrom(call.launch().offer_ids());
-        message.mutable_tasks()->CopyFrom(call.launch().task_infos());
-        send(master.get(), message);
-        break;
-      }
-
       case Call::KILL: {
         if (!call.has_kill()) {
           drop(call, "Expecting 'kill' to be present");
@@ -766,28 +739,7 @@ protected:
 
   void drop(const Call& call, const string& message)
   {
-    VLOG(1) << "Dropping " << stringify(call.type()) << ": " << message;
-
-    switch (call.type()) {
-      case Call::LAUNCH: {
-        // We drop the tasks preemptively (enqueing update events that
-        // put the task in TASK_LOST). This is a hack for now, to keep
-        // the tasks from being forever in PENDING state, when
-        // actually the master never received the launch.
-        // Unfortuantely this is insufficient since it doesn't capture
-        // the case when the scheduler process sends it but the master
-        // never receives it (i.e., during a master failover). In the
-        // future, this should be solved by higher-level abstractions
-        // and this hack should be considered for removal.
-        foreach (const TaskInfo& task, call.launch().task_infos()) {
-          drop(task, message);
-        }
-        break;
-      }
-
-      default:
-        break;
-    }
+    LOG(WARNING) << "Dropping " << call.type() << ": " << message;
   }
 
 private:

http://git-wip-us.apache.org/repos/asf/mesos/blob/79086eb1/src/tests/scheduler_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/scheduler_tests.cpp b/src/tests/scheduler_tests.cpp
index 4a89a7a..4ea5528 100644
--- a/src/tests/scheduler_tests.cpp
+++ b/src/tests/scheduler_tests.cpp
@@ -185,10 +185,14 @@ TEST_F(SchedulerTest, TaskRunning)
     Call call;
     call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
     call.mutable_framework_info()->mutable_id()->CopyFrom(id);
-    call.set_type(Call::LAUNCH);
-    call.mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
-    call.mutable_launch()->add_offer_ids()->CopyFrom(
-        event.get().offers().offers(0).id());
+    call.set_type(Call::ACCEPT);
+
+    Call::Accept* accept = call.mutable_accept();
+    accept->add_offer_ids()->CopyFrom(event.get().offers().offers(0).id());
+
+    Offer::Operation* operation = accept->add_operations();
+    operation->set_type(Offer::Operation::LAUNCH);
+    operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
 
     mesos.send(call);
   }


[10/11] mesos git commit: Added SUBSCRIBE call and SUBSCRIBED event.

Posted by vi...@apache.org.
Added SUBSCRIBE call and SUBSCRIBED event.

Review: https://reviews.apache.org/r/32844


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/eb3c958e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/eb3c958e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/eb3c958e

Branch: refs/heads/master
Commit: eb3c958e3932a823115a502c6b0e57a29b54bf94
Parents: 94cb038
Author: Vinod Kone <vi...@gmail.com>
Authored: Fri Apr 3 14:35:28 2015 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Sat Apr 25 11:32:48 2015 -1000

----------------------------------------------------------------------
 include/mesos/scheduler/scheduler.proto         | 22 ++++----
 src/examples/low_level_scheduler_libprocess.cpp | 28 +++-------
 src/examples/low_level_scheduler_pthread.cpp    | 28 +++-------
 src/master/master.cpp                           | 13 +++--
 src/scheduler/scheduler.cpp                     | 57 +++++++++-----------
 src/tests/scheduler_tests.cpp                   | 24 ++++-----
 6 files changed, 68 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/eb3c958e/include/mesos/scheduler/scheduler.proto
----------------------------------------------------------------------
diff --git a/include/mesos/scheduler/scheduler.proto b/include/mesos/scheduler/scheduler.proto
index 7a77fe1..aea5607 100644
--- a/include/mesos/scheduler/scheduler.proto
+++ b/include/mesos/scheduler/scheduler.proto
@@ -34,8 +34,7 @@ message Event {
   // Possible event types, followed by message definitions if
   // applicable.
   enum Type {
-    REGISTERED = 1;
-    REREGISTERED = 2;
+    SUBSCRIBED = 1;
     OFFERS = 3;
     RESCIND = 4;
     UPDATE = 5;
@@ -44,11 +43,8 @@ message Event {
     ERROR = 8;
   }
 
-  message Registered {
-    required FrameworkID framework_id = 1;
-  }
-
-  message Reregistered {
+  // First event received when the scheduler subscribes.
+  message Subscribed {
     required FrameworkID framework_id = 1;
   }
 
@@ -93,8 +89,7 @@ message Event {
   // present if that type has a nested message definition.
   required Type type = 1;
 
-  optional Registered registered = 2;
-  optional Reregistered reregistered = 3;
+  optional Subscribed subscribed = 2;
   optional Offers offers = 4;
   optional Rescind rescind = 5;
   optional Update update = 6;
@@ -114,8 +109,7 @@ message Call {
   // Possible call types, followed by message definitions if
   // applicable.
   enum Type {
-    REGISTER = 1;
-    REREGISTER = 2;
+    SUBSCRIBE = 1;    // See 'framework_info' below.
     UNREGISTER = 3;
     REVIVE = 6;
     DECLINE = 5;
@@ -127,7 +121,7 @@ message Call {
     SHUTDOWN = 13;
 
     // TODO(benh): Consider adding an 'ACTIVATE' and 'DEACTIVATE' for
-    // already registered frameworks as a way of stopping offers from
+    // already subscribed frameworks as a way of stopping offers from
     // being generated and other events from being sent by the master.
     // Note that this functionality existed originally to support
     // SchedulerDriver::abort which was only necessary to handle
@@ -214,7 +208,9 @@ message Call {
 
   // Identifies who generated this call. Always necessary, but the
   // only thing that needs to be set for certain calls, e.g.,
-  // REGISTER, REREGISTER, and UNREGISTER.
+  // SUBSCRIBE and UNREGISTER. 'framework_info.id()' must be always
+  // set except when a brand new scheduler SUBSCRIBEs for the very
+  // first time.
   required FrameworkInfo framework_info = 1;
 
   // Type of the call, indicates which optional field below should be

http://git-wip-us.apache.org/repos/asf/mesos/blob/eb3c958e/src/examples/low_level_scheduler_libprocess.cpp
----------------------------------------------------------------------
diff --git a/src/examples/low_level_scheduler_libprocess.cpp b/src/examples/low_level_scheduler_libprocess.cpp
index fbfb0f7..cf85c76 100644
--- a/src/examples/low_level_scheduler_libprocess.cpp
+++ b/src/examples/low_level_scheduler_libprocess.cpp
@@ -117,24 +117,13 @@ public:
       events.pop();
 
       switch (event.type()) {
-        case Event::REGISTERED: {
-          cout << endl << "Received a REGISTERED event" << endl;
+        case Event::SUBSCRIBED: {
+          cout << endl << "Received a SUBSCRIBED event" << endl;
 
-          framework.mutable_id()->CopyFrom(event.registered().framework_id());
-          state = REGISTERED;
+          framework.mutable_id()->CopyFrom(event.subscribed().framework_id());
+          state = SUBSCRIBED;
 
-          cout << "Framework '" << event.registered().framework_id().value()
-               << "' registered" << endl;
-          break;
-        }
-
-        case Event::REREGISTERED: {
-          cout << endl << "Received a REREGISTERED event" << endl;
-
-          state = REGISTERED;
-
-          cout << "Framework '" << event.reregistered().framework_id().value()
-               << "' re-registered" << endl;
+          cout << "Subscribed with ID '" << framework.id() << endl;
           break;
         }
 
@@ -303,14 +292,13 @@ private:
 
   void doReliableRegistration()
   {
-    if (state == REGISTERED) {
+    if (state == SUBSCRIBED) {
       return;
     }
 
     Call call;
     call.mutable_framework_info()->CopyFrom(framework);
-    call.set_type(
-        state == INITIALIZING ? Call::REGISTER : Call::REREGISTER);
+    call.set_type(Call::SUBSCRIBE);
 
     mesos.send(call);
 
@@ -334,7 +322,7 @@ private:
 
   enum State {
     INITIALIZING = 0,
-    REGISTERED = 1,
+    SUBSCRIBED = 1,
     DISCONNECTED = 2
   } state;
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/eb3c958e/src/examples/low_level_scheduler_pthread.cpp
----------------------------------------------------------------------
diff --git a/src/examples/low_level_scheduler_pthread.cpp b/src/examples/low_level_scheduler_pthread.cpp
index 944573d..4af576d 100644
--- a/src/examples/low_level_scheduler_pthread.cpp
+++ b/src/examples/low_level_scheduler_pthread.cpp
@@ -135,29 +135,16 @@ public:
       events.pop();
 
       switch (event.type()) {
-        case Event::REGISTERED: {
-          cout << endl << "Received a REGISTERED event" << endl;
+        case Event::SUBSCRIBED: {
+          cout << endl << "Received a SUBSCRIBED event" << endl;
 
           pthread_mutex_lock(&mutex);
-          state = REGISTERED;
+          state = SUBSCRIBED;
           pthread_mutex_unlock(&mutex);
 
-          framework.mutable_id()->CopyFrom(event.registered().framework_id());
+          framework.mutable_id()->CopyFrom(event.subscribed().framework_id());
 
-          cout << "Framework '" << event.registered().framework_id().value()
-               << "' registered" << endl;
-          break;
-        }
-
-        case Event::REREGISTERED: {
-          cout << endl << "Received a REREGISTERED event" << endl;
-
-          pthread_mutex_lock(&mutex);
-          state = REGISTERED;
-          pthread_mutex_unlock(&mutex);
-
-          cout << "Framework '" << event.reregistered().framework_id().value()
-               << "' re-registered" << endl;
+          cout << "Subscribed with ID '" << framework.id() << endl;
           break;
         }
 
@@ -362,8 +349,7 @@ private:
 
     Call call;
     call.mutable_framework_info()->CopyFrom(framework);
-    call.set_type(
-        state == CONNECTED ? Call::REGISTER : Call::REREGISTER);
+    call.set_type(Call::SUBSCRIBE);
     pthread_mutex_unlock(&mutex);
 
     mesos.send(call);
@@ -390,7 +376,7 @@ private:
   enum State {
     INITIALIZING = 0,
     CONNECTED = 1,
-    REGISTERED = 2,
+    SUBSCRIBED = 2,
     DISCONNECTED = 3,
     DONE = 4
   } state;

http://git-wip-us.apache.org/repos/asf/mesos/blob/eb3c958e/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 2c161a9..502d3ba 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1555,15 +1555,13 @@ void Master::receive(
     const scheduler::Call& call)
 {
   // TODO(vinod): Add metrics for calls.
-
+  // TODO(vinod): Implement the unimplemented calls.
   const FrameworkInfo& frameworkInfo = call.framework_info();
 
-  // For REGISTER and REREGISTER calls, no need to look up the
-  // framework. Therefore, we handle them first and separately from
-  // other types of calls.
+  // For SUBSCRIBE call, no need to look up the framework. Therefore,
+  // we handle them first and separately from other types of calls.
   switch (call.type()) {
-    case scheduler::Call::REGISTER:
-    case scheduler::Call::REREGISTER:
+    case scheduler::Call::SUBSCRIBE:
       drop(from, call, "Unimplemented");
       return;
 
@@ -1586,7 +1584,8 @@ void Master::receive(
   }
 
   // TODO(jieyu): Validate frameworkInfo to make sure it's the same as
-  // the one that the framework used during registration.
+  // the one that the framework used during registration and that the
+  // framework id is set and non-empty except for SUBSCRIBE call.
 
   switch (call.type()) {
     case scheduler::Call::UNREGISTER:

http://git-wip-us.apache.org/repos/asf/mesos/blob/eb3c958e/src/scheduler/scheduler.cpp
----------------------------------------------------------------------
diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp
index 7d53d51..82cbfcb 100644
--- a/src/scheduler/scheduler.cpp
+++ b/src/scheduler/scheduler.cpp
@@ -200,9 +200,10 @@ public:
       call.mutable_framework_info()->set_user(user.get());
     }
 
-    // Only a REGISTER should not have set the framework ID.
-    if (call.type() != Call::REGISTER && !call.framework_info().has_id()) {
-      drop(call, "Call is mising FrameworkInfo.id");
+    // Only a SUBSCRIBE call may not have set the framework ID.
+    if (call.type() != Call::SUBSCRIBE &&
+        (!call.framework_info().has_id() || call.framework_info().id() == "")) {
+      drop(call, "Call is missing FrameworkInfo.id");
       return;
     }
 
@@ -213,18 +214,18 @@ public:
     }
 
     switch (call.type()) {
-      case Call::REGISTER: {
-        RegisterFrameworkMessage message;
-        message.mutable_framework()->CopyFrom(call.framework_info());
-        send(master.get(), message);
-        break;
-      }
-
-      case Call::REREGISTER: {
-        ReregisterFrameworkMessage message;
-        message.mutable_framework()->CopyFrom(call.framework_info());
-        message.set_failover(failover);
-        send(master.get(), message);
+      case Call::SUBSCRIBE: {
+        if (!call.framework_info().has_id() ||
+            call.framework_info().id() == "") {
+          RegisterFrameworkMessage message;
+          message.mutable_framework()->CopyFrom(call.framework_info());
+          send(master.get(), message);
+        } else {
+          ReregisterFrameworkMessage message;
+          message.mutable_framework()->CopyFrom(call.framework_info());
+          message.set_failover(failover);
+          send(master.get(), message);
+        }
         break;
       }
 
@@ -386,6 +387,7 @@ protected:
       VLOG(1) << "New master detected at " << master.get();
 
       if (credential.isSome()) {
+        // TODO(vinod): Do pure HTTP Authentication instead of SASL.
         // Authenticate with the master.
         authenticate();
       } else {
@@ -572,34 +574,27 @@ protected:
 
   void receive(const UPID& from, const FrameworkRegisteredMessage& message)
   {
-    // We've now registered at least once with the master so we're no
-    // longer failing over. See the comment where 'failover' is
-    // declared for further details.
-    failover = false;
-
-    Event event;
-    event.set_type(Event::REGISTERED);
-
-    Event::Registered* registered = event.mutable_registered();
-
-    registered->mutable_framework_id()->CopyFrom(message.framework_id());
-
-    receive(from, event);
+    subscribed(from, message.framework_id());
   }
 
   void receive(const UPID& from, const FrameworkReregisteredMessage& message)
   {
+    subscribed(from, message.framework_id());
+  }
+
+  void subscribed(const UPID& from, const FrameworkID& frameworkId)
+  {
     // We've now registered at least once with the master so we're no
     // longer failing over. See the comment where 'failover' is
     // declared for further details.
     failover = false;
 
     Event event;
-    event.set_type(Event::REREGISTERED);
+    event.set_type(Event::SUBSCRIBED);
 
-    Event::Reregistered* reregistered = event.mutable_reregistered();
+    Event::Subscribed* subscribed = event.mutable_subscribed();
 
-    reregistered->mutable_framework_id()->CopyFrom(message.framework_id());
+    subscribed->mutable_framework_id()->CopyFrom(frameworkId);
 
     receive(from, event);
   }

http://git-wip-us.apache.org/repos/asf/mesos/blob/eb3c958e/src/tests/scheduler_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/scheduler_tests.cpp b/src/tests/scheduler_tests.cpp
index a59b146..ddbb712 100644
--- a/src/tests/scheduler_tests.cpp
+++ b/src/tests/scheduler_tests.cpp
@@ -142,16 +142,16 @@ TEST_F(SchedulerTest, TaskRunning)
   {
     Call call;
     call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
-    call.set_type(Call::REGISTER);
+    call.set_type(Call::SUBSCRIBE);
 
     mesos.send(call);
   }
 
   Future<Event> event = events.get();
   AWAIT_READY(event);
-  EXPECT_EQ(Event::REGISTERED, event.get().type());
+  EXPECT_EQ(Event::SUBSCRIBED, event.get().type());
 
-  FrameworkID id(event.get().registered().framework_id());
+  FrameworkID id(event.get().subscribed().framework_id());
 
   event = events.get();
   AWAIT_READY(event);
@@ -247,16 +247,16 @@ TEST_F(SchedulerTest, ReconcileTask)
   {
     Call call;
     call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
-    call.set_type(Call::REGISTER);
+    call.set_type(Call::SUBSCRIBE);
 
     mesos.send(call);
   }
 
   Future<Event> event = events.get();
   AWAIT_READY(event);
-  EXPECT_EQ(Event::REGISTERED, event.get().type());
+  EXPECT_EQ(Event::SUBSCRIBED, event.get().type());
 
-  FrameworkID id(event.get().registered().framework_id());
+  FrameworkID id(event.get().subscribed().framework_id());
 
   event = events.get();
   AWAIT_READY(event);
@@ -354,16 +354,16 @@ TEST_F(SchedulerTest, KillTask)
   {
     Call call;
     call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
-    call.set_type(Call::REGISTER);
+    call.set_type(Call::SUBSCRIBE);
 
     mesos.send(call);
   }
 
   Future<Event> event = events.get();
   AWAIT_READY(event);
-  EXPECT_EQ(Event::REGISTERED, event.get().type());
+  EXPECT_EQ(Event::SUBSCRIBED, event.get().type());
 
-  FrameworkID id(event.get().registered().framework_id());
+  FrameworkID id(event.get().subscribed().framework_id());
 
   event = events.get();
   AWAIT_READY(event);
@@ -478,16 +478,16 @@ TEST_F(SchedulerTest, ShutdownExecutor)
   {
     Call call;
     call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
-    call.set_type(Call::REGISTER);
+    call.set_type(Call::SUBSCRIBE);
 
     mesos.send(call);
   }
 
   Future<Event> event = events.get();
   AWAIT_READY(event);
-  EXPECT_EQ(Event::REGISTERED, event.get().type());
+  EXPECT_EQ(Event::SUBSCRIBED, event.get().type());
 
-  FrameworkID id(event.get().registered().framework_id());
+  FrameworkID id(event.get().subscribed().framework_id());
 
   event = events.get();
   AWAIT_READY(event);