You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2015/04/25 23:33:28 UTC
[01/11] mesos git commit: Removed REQUEST call from scheduler.proto.
Repository: mesos
Updated Branches:
refs/heads/master ec0a9f340 -> 407af3eff
Removed REQUEST call from scheduler.proto.
Review: https://reviews.apache.org/r/32501/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/978e72d4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/978e72d4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/978e72d4
Branch: refs/heads/master
Commit: 978e72d4abc633850ea168e3f749dca4ca482ae8
Parents: 79086eb
Author: Vinod Kone <vi...@gmail.com>
Authored: Fri Mar 20 16:10:32 2015 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Sat Apr 25 11:32:46 2015 -1000
----------------------------------------------------------------------
include/mesos/mesos.proto | 2 ++
include/mesos/scheduler/scheduler.proto | 6 ------
src/master/master.cpp | 1 -
src/master/master.hpp | 1 +
src/scheduler/scheduler.cpp | 12 ------------
5 files changed, 3 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/978e72d4/include/mesos/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index 3a8e8bf..967b1e3 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -609,6 +609,8 @@ message PerfStatistics {
* to proactively influence the allocator. If 'slave_id' is provided
* then this request is assumed to only apply to resources on that
* slave.
+ *
+ * TODO(vinod): Remove this once the old driver is removed.
*/
message Request {
optional SlaveID slave_id = 1;
http://git-wip-us.apache.org/repos/asf/mesos/blob/978e72d4/include/mesos/scheduler/scheduler.proto
----------------------------------------------------------------------
diff --git a/include/mesos/scheduler/scheduler.proto b/include/mesos/scheduler/scheduler.proto
index ce401aa..928995a 100644
--- a/include/mesos/scheduler/scheduler.proto
+++ b/include/mesos/scheduler/scheduler.proto
@@ -117,7 +117,6 @@ message Call {
REGISTER = 1;
REREGISTER = 2;
UNREGISTER = 3;
- REQUEST = 4;
REVIVE = 6;
DECLINE = 5;
ACCEPT = 12;
@@ -135,10 +134,6 @@ message Call {
// something that is not an issue with the Event/Call API.
}
- message Request {
- repeated mesos.Request requests = 1;
- }
-
message Decline {
repeated OfferID offer_ids = 1;
optional Filters filters = 2;
@@ -208,7 +203,6 @@ message Call {
// present if that type has a nested message definition.
required Type type = 2;
- optional Request request = 3;
optional Decline decline = 4;
optional Accept accept = 10;
optional Kill kill = 6;
http://git-wip-us.apache.org/repos/asf/mesos/blob/978e72d4/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 865ff89..c9c2cc2 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1590,7 +1590,6 @@ void Master::receive(
switch (call.type()) {
case scheduler::Call::UNREGISTER:
- case scheduler::Call::REQUEST:
case scheduler::Call::REVIVE:
case scheduler::Call::DECLINE:
drop(from, call, "Unimplemented");
http://git-wip-us.apache.org/repos/asf/mesos/blob/978e72d4/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 550d2c5..59d6015 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -129,6 +129,7 @@ public:
const process::UPID& from,
const FrameworkID& frameworkId);
+ // TODO(vinod): Remove this once the old driver is removed.
void resourceRequest(
const process::UPID& from,
const FrameworkID& frameworkId,
http://git-wip-us.apache.org/repos/asf/mesos/blob/978e72d4/src/scheduler/scheduler.cpp
----------------------------------------------------------------------
diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp
index 6fbd991..e80a0dc 100644
--- a/src/scheduler/scheduler.cpp
+++ b/src/scheduler/scheduler.cpp
@@ -235,18 +235,6 @@ public:
break;
}
- case Call::REQUEST: {
- if (!call.has_request()) {
- drop(call, "Expecting 'request' to be present");
- return;
- }
- ResourceRequestMessage message;
- message.mutable_framework_id()->CopyFrom(call.framework_info().id());
- message.mutable_requests()->CopyFrom(call.request().requests());
- send(master.get(), message);
- break;
- }
-
case Call::DECLINE: {
if (!call.has_decline()) {
drop(call, "Expecting 'decline' to be present");
[03/11] mesos git commit: Updated KILL to optionally include SlaveID.
Posted by vi...@apache.org.
Updated KILL to optionally include SlaveID.
Review: https://reviews.apache.org/r/32843
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6a553a64
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6a553a64
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6a553a64
Branch: refs/heads/master
Commit: 6a553a6431662ddb24bef23f1c9cd54af3ebf865
Parents: d3c3269
Author: Vinod Kone <vi...@gmail.com>
Authored: Fri Apr 3 10:42:12 2015 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Sat Apr 25 11:32:47 2015 -1000
----------------------------------------------------------------------
include/mesos/scheduler/scheduler.proto | 1 +
src/master/master.cpp | 45 ++++++++--
src/master/master.hpp | 4 +
src/scheduler/scheduler.cpp | 5 +-
src/tests/scheduler_tests.cpp | 124 +++++++++++++++++++++++++++
5 files changed, 169 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/6a553a64/include/mesos/scheduler/scheduler.proto
----------------------------------------------------------------------
diff --git a/include/mesos/scheduler/scheduler.proto b/include/mesos/scheduler/scheduler.proto
index f347912..5a94884 100644
--- a/include/mesos/scheduler/scheduler.proto
+++ b/include/mesos/scheduler/scheduler.proto
@@ -166,6 +166,7 @@ message Call {
message Kill {
required TaskID task_id = 1;
+ optional SlaveID slave_id = 2;
}
message Acknowledge {
http://git-wip-us.apache.org/repos/asf/mesos/blob/6a553a64/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index e762d56..cc20be9 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1612,6 +1612,12 @@ void Master::receive(
break;
case scheduler::Call::KILL:
+ if (!call.has_kill()) {
+ drop(from, call, "Expecting 'kill' to be present");
+ }
+ kill(framework, call.kill());
+ break;
+
case scheduler::Call::ACKNOWLEDGE:
case scheduler::Call::MESSAGE:
drop(from, call, "Unimplemented");
@@ -2687,11 +2693,11 @@ void Master::killTask(
const FrameworkID& frameworkId,
const TaskID& taskId)
{
- ++metrics->messages_kill_task;
-
LOG(INFO) << "Asked to kill task " << taskId
<< " of framework " << frameworkId;
+ ++metrics->messages_kill_task;
+
Framework* framework = getFramework(frameworkId);
if (framework == NULL) {
@@ -2708,13 +2714,28 @@ void Master::killTask(
return;
}
+ scheduler::Call::Kill call;
+ call.mutable_task_id()->CopyFrom(taskId);
+
+ kill(framework, call);
+}
+
+
+void Master::kill(Framework* framework, const scheduler::Call::Kill& kill)
+{
+ CHECK_NOTNULL(framework);
+
+ const TaskID& taskId = kill.task_id();
+ const Option<SlaveID> slaveId =
+ kill.has_slave_id() ? Option<SlaveID>(kill.slave_id()) : None();
+
if (framework->pendingTasks.contains(taskId)) {
// Remove from pending tasks.
framework->pendingTasks.erase(taskId);
const StatusUpdate& update = protobuf::createStatusUpdate(
- frameworkId,
- None(),
+ framework->id(),
+ slaveId,
taskId,
TASK_KILLED,
TaskStatus::SOURCE_MASTER,
@@ -2733,18 +2754,30 @@ void Master::killTask(
TaskStatus status;
status.mutable_task_id()->CopyFrom(taskId);
+ if (slaveId.isSome()) {
+ status.mutable_slave_id()->CopyFrom(slaveId.get());
+ }
_reconcileTasks(framework, {status});
return;
}
+ if (slaveId.isSome() && !(slaveId.get() == task->slave_id())) {
+ LOG(WARNING) << "Cannot kill task " << taskId << " of slave "
+ << slaveId.get() << " of framework " << *framework
+ << " because it belongs to different slave "
+ << task->slave_id();
+ // TODO(vinod): Return a "Bad Request" when using HTTP API.
+ return;
+ }
+
Slave* slave = getSlave(task->slave_id());
CHECK(slave != NULL) << "Unknown slave " << task->slave_id();
// We add the task to 'killedTasks' here because the slave
// might be partitioned or disconnected but the master
// doesn't know it yet.
- slave->killedTasks.put(frameworkId, taskId);
+ slave->killedTasks.put(framework->id(), taskId);
// NOTE: This task will be properly reconciled when the
// disconnected slave re-registers with the master.
@@ -2754,7 +2787,7 @@ void Master::killTask(
<< " of framework " << *framework;
KillTaskMessage message;
- message.mutable_framework_id()->MergeFrom(frameworkId);
+ message.mutable_framework_id()->MergeFrom(framework->id());
message.mutable_task_id()->MergeFrom(taskId);
send(slave->pid, message);
} else {
http://git-wip-us.apache.org/repos/asf/mesos/blob/6a553a64/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index ec17a60..5d14a53 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -457,6 +457,10 @@ private:
Framework* framework,
const scheduler::Call::Reconcile& reconcile);
+ void kill(
+ Framework* framework,
+ const scheduler::Call::Kill& kill);
+
bool elected() const
{
return leader.isSome() && leader.get() == info_;
http://git-wip-us.apache.org/repos/asf/mesos/blob/6a553a64/src/scheduler/scheduler.cpp
----------------------------------------------------------------------
diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp
index 8f0f374..2bbb221 100644
--- a/src/scheduler/scheduler.cpp
+++ b/src/scheduler/scheduler.cpp
@@ -288,10 +288,7 @@ public:
drop(call, "Expecting 'kill' to be present");
return;
}
- KillTaskMessage message;
- message.mutable_framework_id()->CopyFrom(call.framework_info().id());
- message.mutable_task_id()->CopyFrom(call.kill().task_id());
- send(master.get(), message);
+ send(master.get(), call);
break;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/6a553a64/src/tests/scheduler_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/scheduler_tests.cpp b/src/tests/scheduler_tests.cpp
index 4911920..a1e49af 100644
--- a/src/tests/scheduler_tests.cpp
+++ b/src/tests/scheduler_tests.cpp
@@ -318,6 +318,130 @@ TEST_F(SchedulerTest, ReconcileTask)
}
+TEST_F(SchedulerTest, KillTask)
+{
+ Try<PID<Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+ TestContainerizer containerizer(&exec);
+
+ Try<PID<Slave>> slave = StartSlave(&containerizer);
+ ASSERT_SOME(slave);
+
+ Callbacks callbacks;
+
+ Future<Nothing> connected;
+ EXPECT_CALL(callbacks, connected())
+ .WillOnce(FutureSatisfy(&connected));
+
+ scheduler::Mesos mesos(
+ master.get(),
+ DEFAULT_CREDENTIAL,
+ lambda::bind(&Callbacks::connected, lambda::ref(callbacks)),
+ lambda::bind(&Callbacks::disconnected, lambda::ref(callbacks)),
+ lambda::bind(&Callbacks::received, lambda::ref(callbacks), lambda::_1));
+
+ AWAIT_READY(connected);
+
+ Queue<Event> events;
+
+ EXPECT_CALL(callbacks, received(_))
+ .WillRepeatedly(Enqueue(&events));
+
+ {
+ Call call;
+ call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
+ call.set_type(Call::REGISTER);
+
+ mesos.send(call);
+ }
+
+ Future<Event> event = events.get();
+ AWAIT_READY(event);
+ EXPECT_EQ(Event::REGISTERED, event.get().type());
+
+ FrameworkID id(event.get().registered().framework_id());
+
+ event = events.get();
+ AWAIT_READY(event);
+ EXPECT_EQ(Event::OFFERS, event.get().type());
+ EXPECT_NE(0, event.get().offers().offers().size());
+
+ EXPECT_CALL(exec, registered(_, _, _, _))
+ .Times(1);
+
+ EXPECT_CALL(exec, launchTask(_, _))
+ .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+ Offer offer = event.get().offers().offers(0);
+ TaskInfo taskInfo = createTask(offer,"", DEFAULT_EXECUTOR_ID);
+
+ {
+ Call call;
+ call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
+ call.mutable_framework_info()->mutable_id()->CopyFrom(id);
+ call.set_type(Call::ACCEPT);
+
+ Call::Accept* accept = call.mutable_accept();
+ accept->add_offer_ids()->CopyFrom(offer.id());
+
+ Offer::Operation* operation = accept->add_operations();
+ operation->set_type(Offer::Operation::LAUNCH);
+ operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
+
+ mesos.send(call);
+ }
+
+ event = events.get();
+ AWAIT_READY(event);
+ EXPECT_EQ(Event::UPDATE, event.get().type());
+ EXPECT_EQ(TASK_RUNNING, event.get().update().status().state());
+
+ {
+ // Acknowledge TASK_RUNNING update.
+ Call call;
+ call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
+ call.mutable_framework_info()->mutable_id()->CopyFrom(id);
+ call.set_type(Call::ACKNOWLEDGE);
+
+ Call::Acknowledge* acknowledge = call.mutable_acknowledge();
+ acknowledge->mutable_task_id()->CopyFrom(taskInfo.task_id());
+ acknowledge->mutable_slave_id()->CopyFrom(offer.slave_id());
+ acknowledge->set_uuid(event.get().update().status().uuid());
+
+ mesos.send(call);
+ }
+
+ EXPECT_CALL(exec, killTask(_, _))
+ .WillOnce(SendStatusUpdateFromTaskID(TASK_KILLED));
+
+ {
+ Call call;
+ call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
+ call.mutable_framework_info()->mutable_id()->CopyFrom(id);
+ call.set_type(Call::KILL);
+
+ Call::Kill* kill = call.mutable_kill();
+ kill->mutable_task_id()->CopyFrom(taskInfo.task_id());
+ kill->mutable_slave_id()->CopyFrom(offer.slave_id());
+
+ mesos.send(call);
+ }
+
+ event = events.get();
+ AWAIT_READY(event);
+ EXPECT_EQ(Event::UPDATE, event.get().type());
+ EXPECT_EQ(TASK_KILLED, event.get().update().status().state());
+
+ EXPECT_CALL(exec, shutdown(_))
+ .Times(AtMost(1));
+
+ Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
+}
+
+
// TODO(benh): Write test for sending Call::Acknowledgement through
// master to slave when Event::Update was generated locally.
[07/11] mesos git commit: Removed 'uuid' field from UPDATE call.
Posted by vi...@apache.org.
Removed 'uuid' field from UPDATE call.
Review: https://reviews.apache.org/r/33465
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/32d1b67d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/32d1b67d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/32d1b67d
Branch: refs/heads/master
Commit: 32d1b67dded33c5859c271306538bb58a5de04be
Parents: c3de1e8
Author: Vinod Kone <vi...@gmail.com>
Authored: Wed Apr 22 19:16:51 2015 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Sat Apr 25 11:32:48 2015 -1000
----------------------------------------------------------------------
include/mesos/scheduler/scheduler.proto | 3 +--
src/examples/low_level_scheduler_libprocess.cpp | 22 +++++++++++---------
src/examples/low_level_scheduler_pthread.cpp | 22 +++++++++++---------
src/sched/sched.cpp | 6 +++---
src/scheduler/scheduler.cpp | 15 ++++++++-----
src/tests/scheduler_tests.cpp | 1 +
6 files changed, 39 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/32d1b67d/include/mesos/scheduler/scheduler.proto
----------------------------------------------------------------------
diff --git a/include/mesos/scheduler/scheduler.proto b/include/mesos/scheduler/scheduler.proto
index ec9adf6..5ca64cb 100644
--- a/include/mesos/scheduler/scheduler.proto
+++ b/include/mesos/scheduler/scheduler.proto
@@ -57,8 +57,7 @@ message Event {
}
message Update {
- required bytes uuid = 1; // TODO(benh): Replace with UpdateID.
- required TaskStatus status = 2;
+ required TaskStatus status = 1;
}
message Message {
http://git-wip-us.apache.org/repos/asf/mesos/blob/32d1b67d/src/examples/low_level_scheduler_libprocess.cpp
----------------------------------------------------------------------
diff --git a/src/examples/low_level_scheduler_libprocess.cpp b/src/examples/low_level_scheduler_libprocess.cpp
index b55ad60..bee2e7e 100644
--- a/src/examples/low_level_scheduler_libprocess.cpp
+++ b/src/examples/low_level_scheduler_libprocess.cpp
@@ -142,7 +142,7 @@ public:
cout << endl << "Received an UPDATE event" << endl;
// TODO(zuyu): Do batch processing of UPDATE events.
- statusUpdate(event.update().uuid(), event.update().status());
+ statusUpdate(event.update().status());
break;
}
@@ -251,7 +251,7 @@ private:
}
}
- void statusUpdate(const string& uuid, const TaskStatus& status)
+ void statusUpdate(const TaskStatus& status)
{
cout << "Task " << status.task_id() << " is in state " << status.state();
@@ -260,16 +260,18 @@ private:
}
cout << endl;
- Call call;
- call.mutable_framework_info()->CopyFrom(framework);
- call.set_type(Call::ACKNOWLEDGE);
+ if (status.has_uuid()) {
+ Call call;
+ call.mutable_framework_info()->CopyFrom(framework);
+ call.set_type(Call::ACKNOWLEDGE);
- Call::Acknowledge* ack = call.mutable_acknowledge();
- ack->mutable_slave_id()->CopyFrom(status.slave_id());
- ack->mutable_task_id ()->CopyFrom(status.task_id ());
- ack->set_uuid(uuid);
+ Call::Acknowledge* ack = call.mutable_acknowledge();
+ ack->mutable_slave_id()->CopyFrom(status.slave_id());
+ ack->mutable_task_id ()->CopyFrom(status.task_id ());
+ ack->set_uuid(status.uuid());
- mesos.send(call);
+ mesos.send(call);
+ }
if (status.state() == TASK_FINISHED) {
++tasksFinished;
http://git-wip-us.apache.org/repos/asf/mesos/blob/32d1b67d/src/examples/low_level_scheduler_pthread.cpp
----------------------------------------------------------------------
diff --git a/src/examples/low_level_scheduler_pthread.cpp b/src/examples/low_level_scheduler_pthread.cpp
index 64a0e44..fb8cd66 100644
--- a/src/examples/low_level_scheduler_pthread.cpp
+++ b/src/examples/low_level_scheduler_pthread.cpp
@@ -163,7 +163,7 @@ public:
cout << endl << "Received an UPDATE event" << endl;
// TODO(zuyu): Do batch processing of UPDATE events.
- statusUpdate(event.update().uuid(), event.update().status());
+ statusUpdate(event.update().status());
break;
}
@@ -299,7 +299,7 @@ private:
}
}
- void statusUpdate(const string& uuid, const TaskStatus& status)
+ void statusUpdate(const TaskStatus& status)
{
cout << "Task " << status.task_id() << " is in state " << status.state();
@@ -308,16 +308,18 @@ private:
}
cout << endl;
- Call call;
- call.set_type(Call::ACKNOWLEDGE);
- call.mutable_framework_info()->CopyFrom(framework);
+ if (status.has_uuid()) {
+ Call call;
+ call.set_type(Call::ACKNOWLEDGE);
+ call.mutable_framework_info()->CopyFrom(framework);
- Call::Acknowledge* ack = call.mutable_acknowledge();
- ack->mutable_slave_id()->CopyFrom(status.slave_id());
- ack->mutable_task_id ()->CopyFrom(status.task_id ());
- ack->set_uuid(uuid);
+ Call::Acknowledge* ack = call.mutable_acknowledge();
+ ack->mutable_slave_id()->CopyFrom(status.slave_id());
+ ack->mutable_task_id ()->CopyFrom(status.task_id ());
+ ack->set_uuid(status.uuid());
- mesos.send(call);
+ mesos.send(call);
+ }
if (status.state() == TASK_FINISHED) {
++tasksFinished;
http://git-wip-us.apache.org/repos/asf/mesos/blob/32d1b67d/src/sched/sched.cpp
----------------------------------------------------------------------
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index 66fd2b3..8c366ec 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -698,9 +698,9 @@ protected:
// ensure that a 0.22.0 scheduler driver supports explicit
// acknowledgements, even if running against a 0.21.0 cluster.
//
- // TODO(bmahler): Update the slave / executor driver to ensure
- // that 'uuid' is set accurately by the time it reaches the
- // scheduler driver. This will be required for pure bindings.
+ // TODO(bmahler): Update master and slave to ensure that 'uuid' is
+ // set accurately by the time it reaches the scheduler driver.
+ // This will be required for pure bindings.
if (from == UPID() || pid == UPID()) {
status.clear_uuid();
} else {
http://git-wip-us.apache.org/repos/asf/mesos/blob/32d1b67d/src/scheduler/scheduler.cpp
----------------------------------------------------------------------
diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp
index 2047ee4..d3d28ee 100644
--- a/src/scheduler/scheduler.cpp
+++ b/src/scheduler/scheduler.cpp
@@ -44,6 +44,7 @@
#include <process/future.hpp>
#include <process/id.hpp>
#include <process/mutex.hpp>
+#include <process/pid.hpp>
#include <process/process.hpp>
#include <process/protobuf.hpp>
@@ -642,8 +643,15 @@ protected:
update->mutable_status()->set_timestamp(message.update().timestamp());
- update->set_uuid(message.update().uuid());
- update->mutable_status()->set_uuid(message.update().uuid());
+ // If the update is generated by the master it doesn't need to be
+ // acknowledged; so we unset the UUID inside TaskStatus.
+ // TODO(vinod): Update master and slave to ensure that 'uuid' is
+ // set accurately by the time it reaches the scheduler.
+ if (UPID(message.pid()) == UPID()) {
+ update->mutable_status()->clear_uuid();
+ } else {
+ update->mutable_status()->set_uuid(message.update().uuid());
+ }
receive(from, event);
}
@@ -729,9 +737,6 @@ protected:
status->set_message(message);
status->set_timestamp(Clock::now().secs());
- update->set_uuid(UUID::random().toBytes());
- status->set_uuid(update->uuid());
-
receive(None(), event);
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/32d1b67d/src/tests/scheduler_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/scheduler_tests.cpp b/src/tests/scheduler_tests.cpp
index 54d6bc9..f2cb1d8 100644
--- a/src/tests/scheduler_tests.cpp
+++ b/src/tests/scheduler_tests.cpp
@@ -308,6 +308,7 @@ TEST_F(SchedulerTest, ReconcileTask)
event = events.get();
AWAIT_READY(event);
EXPECT_EQ(Event::UPDATE, event.get().type());
+ EXPECT_FALSE(event.get().update().status().has_uuid());
EXPECT_EQ(TASK_RUNNING, event.get().update().status().state());
EXPECT_EQ(TaskStatus::REASON_RECONCILIATION,
event.get().update().status().reason());
[11/11] mesos git commit: Documented the scheduler Event/Call
protobufs.
Posted by vi...@apache.org.
Documented the scheduler Event/Call protobufs.
Review: https://reviews.apache.org/r/32509
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/407af3ef
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/407af3ef
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/407af3ef
Branch: refs/heads/master
Commit: 407af3eff050cd0318037c0cbcc6f3d6e1ecd9ee
Parents: 32d1b67
Author: Vinod Kone <vi...@gmail.com>
Authored: Wed Mar 25 15:34:08 2015 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Sat Apr 25 11:32:49 2015 -1000
----------------------------------------------------------------------
include/mesos/scheduler/scheduler.proto | 129 +++++++++++++++++++--------
1 file changed, 92 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/407af3ef/include/mesos/scheduler/scheduler.proto
----------------------------------------------------------------------
diff --git a/include/mesos/scheduler/scheduler.proto b/include/mesos/scheduler/scheduler.proto
index 5ca64cb..249ec53 100644
--- a/include/mesos/scheduler/scheduler.proto
+++ b/include/mesos/scheduler/scheduler.proto
@@ -25,22 +25,23 @@ option java_outer_classname = "Protos";
/**
- * Low-level scheduler event API.
+ * Scheduler event API.
*
* An event is described using the standard protocol buffer "union"
- * trick, see https://developers.google.com/protocol-buffers/docs/techniques#union.
+ * trick, see:
+ * https://developers.google.com/protocol-buffers/docs/techniques#union.
*/
message Event {
// Possible event types, followed by message definitions if
// applicable.
enum Type {
- SUBSCRIBED = 1;
- OFFERS = 3;
- RESCIND = 4;
- UPDATE = 5;
- MESSAGE = 6;
- FAILURE = 7;
- ERROR = 8;
+ SUBSCRIBED = 1; // See 'Subscribed' below.
+ OFFERS = 2; // See 'Offers' below.
+ RESCIND = 3; // See 'Rescind' below.
+ UPDATE = 4; // See 'Update' below.
+ MESSAGE = 5; // See 'Message' below.
+ FAILURE = 6; // See 'Failure' below.
+ ERROR = 7; // See 'Error' below.
}
// First event received when the scheduler subscribes.
@@ -48,26 +49,56 @@ message Event {
required FrameworkID framework_id = 1;
}
+ // Received whenever there are new resources that are offered to the
+ // scheduler. Each offer corresponds to a set of resources on a
+ // slave. Until the scheduler accepts or declines an offer the
+ // resources are considered allocated to the scheduler.
message Offers {
repeated Offer offers = 1;
}
+ // Received when a particular offer is no longer valid (e.g., the
+ // slave corresponding to the offer has been removed) and hence
+ // needs to be rescinded. Any future calls ('Accept' / 'Decline') made
+ // by the scheduler regarding this offer will be invalid.
message Rescind {
required OfferID offer_id = 1;
}
+ // Received whenever there is a status update that is generated by
+ // the executor or slave or master. Status updates should be used by
+ // executors to reliably communicate the status of the tasks that
+ // they manage. It is crucial that a terminal update (see TaskState
+ // in mesos.proto) is sent by the executor as soon as the task
+ // terminates, in order for Mesos to release the resources allocated
+ // to the task. It is also the responsibility of the scheduler to
+ // explicitly acknowledge the receipt of a status update. See
+ // 'Acknowledge' in the 'Call' section below for the semantics.
message Update {
required TaskStatus status = 1;
}
+ // Received when a custom message generated by the executor is
+ // forwarded by the master. Note that this message is not
+ // interpreted by Mesos and is only forwarded (without reliability
+ // guarantees) to the scheduler. It is up to the executor to retry
+ // if the message is dropped for any reason.
message Message {
required SlaveID slave_id = 1;
required ExecutorID executor_id = 2;
required bytes data = 3;
}
+ // Received when a slave is removed from the cluster (e.g., failed
+ // health checks) or when an executor is terminated. Note that, this
+ // event coincides with receipt of terminal UPDATE events for any
+ // active tasks belonging to the slave or executor and receipt of
+ // 'Rescind' events for any outstanding offers belonging to the
+ // slave. Note that there is no guaranteed order between the
+ // 'Failure', 'Update' and 'Rescind' events when a slave or executor
+ // is removed.
// TODO(vinod): Consider splitting the lost slave and terminated
- // executor into separate events.
+ // executor into separate events and ensure it's reliably generated.
message Failure {
optional SlaveID slave_id = 1;
@@ -78,28 +109,33 @@ message Event {
optional int32 status = 3;
}
+ // Received when an invalid framework (e.g., unauthenticated,
+ // unauthorized) attempts to subscribe with the master. Error can
+ // also be received if scheduler sends invalid Calls (e.g., not
+ // properly initialized).
+ // TODO(vinod): Remove this once the old scheduler driver is no
+ // longer supported. With HTTP API all errors will be signaled via
+ // HTTP response codes.
message Error {
required string message = 1;
}
- // TODO(benh): Add a 'from' or 'sender'.
-
// Type of the event, indicates which optional field below should be
// present if that type has a nested message definition.
required Type type = 1;
optional Subscribed subscribed = 2;
- optional Offers offers = 4;
- optional Rescind rescind = 5;
- optional Update update = 6;
- optional Message message = 7;
- optional Failure failure = 8;
- optional Error error = 9;
+ optional Offers offers = 3;
+ optional Rescind rescind = 4;
+ optional Update update = 5;
+ optional Message message = 6;
+ optional Failure failure = 7;
+ optional Error error = 8;
}
/**
- * Low-level scheduler call API.
+ * Scheduler call API.
*
* Like Event, a Call is described using the standard protocol buffer
* "union" trick (see above).
@@ -108,16 +144,16 @@ message Call {
// Possible call types, followed by message definitions if
// applicable.
enum Type {
- SUBSCRIBE = 1; // See 'framework_info' below.
- TEARDOWN = 3; // Shuts down all tasks and executors.
- REVIVE = 6;
- DECLINE = 5;
- ACCEPT = 12;
- KILL = 8;
- ACKNOWLEDGE = 9;
- RECONCILE = 10;
- MESSAGE = 11;
- SHUTDOWN = 13;
+ SUBSCRIBE = 1; // See 'framework_info' below.
+ TEARDOWN = 2; // Shuts down all tasks/executors and removes framework.
+ ACCEPT = 3; // See 'Accept' below.
+ DECLINE = 4; // See 'Decline' below.
+ REVIVE = 5; // Removes any previous filters set via ACCEPT or DECLINE.
+ KILL = 6; // See 'Kill' below.
+ SHUTDOWN = 7; // See 'Shutdown' below.
+ ACKNOWLEDGE = 8; // See 'Acknowledge' below.
+ RECONCILE = 9; // See 'Reconcile' below.
+ MESSAGE = 10; // See 'Message' below.
// TODO(benh): Consider adding an 'ACTIVATE' and 'DEACTIVATE' for
// already subscribed frameworks as a way of stopping offers from
@@ -128,11 +164,6 @@ message Call {
// something that is not an issue with the Event/Call API.
}
- message Decline {
- repeated OfferID offer_ids = 1;
- optional Filters filters = 2;
- }
-
// Accepts an offer, performing the specified operations
// in a sequential manner.
//
@@ -160,6 +191,22 @@ message Call {
optional Filters filters = 3;
}
+ // Declines an offer, signaling the master to potentially reoffer
+ // the resources to a different framework. Note that this is same
+ // as sending an Accept call with no operations. See comments on
+ // top of 'Accept' for semantics.
+ message Decline {
+ repeated OfferID offer_ids = 1;
+ optional Filters filters = 2;
+ }
+
+ // Kills a specific task. If the scheduler has a custom executor,
+ // the kill is forwarded to the executor and it is up to the
+ // executor to kill the task and send a TASK_KILLED (or TASK_FAILED)
+ // update. Note that Mesos releases the resources for a task once it
+ // receives a terminal update (See TaskState in mesos.proto) for it.
+ // If the task is unknown to the master, a TASK_LOST update is
+ // generated.
message Kill {
required TaskID task_id = 1;
optional SlaveID slave_id = 2;
@@ -177,6 +224,11 @@ message Call {
required SlaveID slave_id = 2;
}
+ // Acknowledges the receipt of status update. Schedulers are
+ // responsible for explicitly acknowledging the receipt of status
+ // updates that have 'Update.status().uuid()' field set. Such status
+ // updates are retried by the slave until they are acknowledged by
+ // the scheduler.
message Acknowledge {
required SlaveID slave_id = 1;
required TaskID task_id = 2;
@@ -199,6 +251,9 @@ message Call {
repeated Task tasks = 1;
}
+ // Sends arbitrary binary data to the executor. Note that Mesos
+ // neither interprets this data nor makes any guarantees about the
+ // delivery of this message to the executor.
message Message {
required SlaveID slave_id = 1;
required ExecutorID executor_id = 2;
@@ -216,11 +271,11 @@ message Call {
// present if that type has a nested message definition.
required Type type = 2;
+ optional Accept accept = 3;
optional Decline decline = 4;
- optional Accept accept = 10;
- optional Kill kill = 6;
+ optional Kill kill = 5;
+ optional Shutdown shutdown = 6;
optional Acknowledge acknowledge = 7;
optional Reconcile reconcile = 8;
optional Message message = 9;
- optional Shutdown shutdown = 11;
}
[06/11] mesos git commit: Updated RECONCILE call to optionally
specifiy a slave id.
Posted by vi...@apache.org.
Updated RECONCILE call to optionally specifiy a slave id.
Review: https://reviews.apache.org/r/32502/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f95fa119
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f95fa119
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f95fa119
Branch: refs/heads/master
Commit: f95fa119044c9a11c8473ab088e948e7e1c1334d
Parents: 978e72d
Author: Vinod Kone <vi...@gmail.com>
Authored: Fri Mar 20 16:54:36 2015 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Sat Apr 25 11:32:47 2015 -1000
----------------------------------------------------------------------
include/mesos/scheduler/scheduler.proto | 20 +++--
src/master/master.cpp | 32 +++++++-
src/master/master.hpp | 4 +
src/scheduler/scheduler.cpp | 6 +-
src/tests/scheduler_tests.cpp | 107 +++++++++++++++++++++++++++
5 files changed, 156 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/f95fa119/include/mesos/scheduler/scheduler.proto
----------------------------------------------------------------------
diff --git a/include/mesos/scheduler/scheduler.proto b/include/mesos/scheduler/scheduler.proto
index 928995a..51bfe8d 100644
--- a/include/mesos/scheduler/scheduler.proto
+++ b/include/mesos/scheduler/scheduler.proto
@@ -176,16 +176,20 @@ message Call {
required bytes uuid = 3;
}
- // Allows the framework to query the status for non-terminal tasks.
+ // Allows the scheduler to query the status for non-terminal tasks.
// This causes the master to send back the latest task status for
- // each task in 'statuses', if possible. Tasks that are no longer
- // known will result in a TASK_LOST update. If statuses is empty,
- // then the master will send the latest status for each task
- // currently known.
- // TODO(bmahler): Add a guiding document for reconciliation or
- // document reconciliation in-depth here.
+ // each task in 'tasks', if possible. Tasks that are no longer known
+ // will result in a TASK_LOST update. If 'statuses' is empty, then
+ // the master will send the latest status for each task currently
+ // known.
message Reconcile {
- repeated TaskStatus statuses = 1; // Should be non-terminal only.
+ // TODO(vinod): Support arbitrary queries than just state of tasks.
+ message Task {
+ required TaskID task_id = 1;
+ optional SlaveID slave_id = 2;
+ }
+
+ repeated Task tasks = 1;
}
message Message {
http://git-wip-us.apache.org/repos/asf/mesos/blob/f95fa119/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index c9c2cc2..e762d56 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1603,9 +1603,16 @@ void Master::receive(
accept(framework, call.accept());
break;
+ case scheduler::Call::RECONCILE:
+ if (!call.has_reconcile()) {
+ drop(from, call, "Expecting 'reconcile' to be present");
+ return;
+ }
+ reconcile(framework, call.reconcile());
+ break;
+
case scheduler::Call::KILL:
case scheduler::Call::ACKNOWLEDGE:
- case scheduler::Call::RECONCILE:
case scheduler::Call::MESSAGE:
drop(from, call, "Unimplemented");
break;
@@ -3477,6 +3484,29 @@ void Master::shutdownSlave(const SlaveID& slaveId, const string& message)
}
+void Master::reconcile(
+ Framework* framework,
+ const scheduler::Call::Reconcile& reconcile)
+{
+ CHECK_NOTNULL(framework);
+
+ // Construct 'TaskStatus'es from 'Reconcile::Task's.
+ vector<TaskStatus> statuses;
+ foreach (const scheduler::Call::Reconcile::Task& task, reconcile.tasks()) {
+ TaskStatus status;
+ status.mutable_task_id()->CopyFrom(task.task_id());
+ status.set_state(TASK_RUNNING); // Dummy status.
+ if (task.has_slave_id()) {
+ status.mutable_slave_id()->CopyFrom(task.slave_id());
+ }
+
+ statuses.push_back(status);
+ }
+
+ _reconcileTasks(framework, statuses);
+}
+
+
void Master::reconcileTasks(
const UPID& from,
const FrameworkID& frameworkId,
http://git-wip-us.apache.org/repos/asf/mesos/blob/f95fa119/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 59d6015..ec17a60 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -453,6 +453,10 @@ private:
const scheduler::Call::Accept& accept,
const process::Future<std::list<process::Future<bool>>>& authorizations);
+ void reconcile(
+ Framework* framework,
+ const scheduler::Call::Reconcile& reconcile);
+
bool elected() const
{
return leader.isSome() && leader.get() == info_;
http://git-wip-us.apache.org/repos/asf/mesos/blob/f95fa119/src/scheduler/scheduler.cpp
----------------------------------------------------------------------
diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp
index e80a0dc..d417442 100644
--- a/src/scheduler/scheduler.cpp
+++ b/src/scheduler/scheduler.cpp
@@ -314,10 +314,8 @@ public:
drop(call, "Expecting 'reconcile' to be present");
return;
}
- ReconcileTasksMessage message;
- message.mutable_framework_id()->CopyFrom(call.framework_info().id());
- message.mutable_statuses()->CopyFrom(call.reconcile().statuses());
- send(master.get(), message);
+
+ send(master.get(), call);
break;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/f95fa119/src/tests/scheduler_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/scheduler_tests.cpp b/src/tests/scheduler_tests.cpp
index 4ea5528..4911920 100644
--- a/src/tests/scheduler_tests.cpp
+++ b/src/tests/scheduler_tests.cpp
@@ -211,6 +211,113 @@ TEST_F(SchedulerTest, TaskRunning)
}
+TEST_F(SchedulerTest, ReconcileTask)
+{
+ Try<PID<Master> > master = StartMaster();
+ ASSERT_SOME(master);
+
+ MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+ TestContainerizer containerizer(&exec);
+
+ Try<PID<Slave> > slave = StartSlave(&containerizer);
+ ASSERT_SOME(slave);
+
+ Callbacks callbacks;
+
+ Future<Nothing> connected;
+ EXPECT_CALL(callbacks, connected())
+ .WillOnce(FutureSatisfy(&connected));
+
+ scheduler::Mesos mesos(
+ master.get(),
+ DEFAULT_CREDENTIAL,
+ lambda::bind(&Callbacks::connected, lambda::ref(callbacks)),
+ lambda::bind(&Callbacks::disconnected, lambda::ref(callbacks)),
+ lambda::bind(&Callbacks::received, lambda::ref(callbacks), lambda::_1));
+
+ AWAIT_READY(connected);
+
+ Queue<Event> events;
+
+ EXPECT_CALL(callbacks, received(_))
+ .WillRepeatedly(Enqueue(&events));
+
+ {
+ Call call;
+ call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
+ call.set_type(Call::REGISTER);
+
+ mesos.send(call);
+ }
+
+ Future<Event> event = events.get();
+ AWAIT_READY(event);
+ EXPECT_EQ(Event::REGISTERED, event.get().type());
+
+ FrameworkID id(event.get().registered().framework_id());
+
+ event = events.get();
+ AWAIT_READY(event);
+ EXPECT_EQ(Event::OFFERS, event.get().type());
+ EXPECT_NE(0, event.get().offers().offers().size());
+
+ EXPECT_CALL(exec, registered(_, _, _, _))
+ .Times(1);
+
+ EXPECT_CALL(exec, launchTask(_, _))
+ .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+ Offer offer = event.get().offers().offers(0);
+ TaskInfo taskInfo = createTask(offer,"", DEFAULT_EXECUTOR_ID);
+
+ {
+ Call call;
+ call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
+ call.mutable_framework_info()->mutable_id()->CopyFrom(id);
+ call.set_type(Call::ACCEPT);
+
+ Call::Accept* accept = call.mutable_accept();
+ accept->add_offer_ids()->CopyFrom(offer.id());
+
+ Offer::Operation* operation = accept->add_operations();
+ operation->set_type(Offer::Operation::LAUNCH);
+ operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
+
+ mesos.send(call);
+ }
+
+ event = events.get();
+ AWAIT_READY(event);
+ EXPECT_EQ(Event::UPDATE, event.get().type());
+ EXPECT_EQ(TASK_RUNNING, event.get().update().status().state());
+
+ {
+ Call call;
+ call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
+ call.mutable_framework_info()->mutable_id()->CopyFrom(id);
+ call.set_type(Call::RECONCILE);
+
+ Call::Reconcile::Task* task = call.mutable_reconcile()->add_tasks();
+ task->mutable_task_id()->CopyFrom(taskInfo.task_id());
+
+ mesos.send(call);
+ }
+
+ event = events.get();
+ AWAIT_READY(event);
+ EXPECT_EQ(Event::UPDATE, event.get().type());
+ EXPECT_EQ(TASK_RUNNING, event.get().update().status().state());
+ EXPECT_EQ(TaskStatus::REASON_RECONCILIATION,
+ event.get().update().status().reason());
+
+ EXPECT_CALL(exec, shutdown(_))
+ .Times(AtMost(1));
+
+ Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
+}
+
+
// TODO(benh): Write test for sending Call::Acknowledgement through
// master to slave when Event::Update was generated locally.
[04/11] mesos git commit: Added SHUTDOWN scheduler call.
Posted by vi...@apache.org.
Added SHUTDOWN scheduler call.
Review: https://reviews.apache.org/r/32505
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2d447e76
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2d447e76
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2d447e76
Branch: refs/heads/master
Commit: 2d447e762a4a11e7b7f916b045acc90c65b51b90
Parents: 6a553a6
Author: Vinod Kone <vi...@gmail.com>
Authored: Mon Mar 23 18:02:46 2015 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Sat Apr 25 11:32:47 2015 -1000
----------------------------------------------------------------------
include/mesos/scheduler/scheduler.proto | 16 ++++
src/master/master.cpp | 52 +++++++++-
src/master/master.hpp | 4 +
src/messages/messages.proto | 11 ++-
src/scheduler/scheduler.cpp | 24 +++++
src/slave/slave.cpp | 84 +++++++++++++++-
src/slave/slave.hpp | 33 ++++---
src/tests/scheduler_tests.cpp | 137 ++++++++++++++++++++++++---
8 files changed, 324 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/2d447e76/include/mesos/scheduler/scheduler.proto
----------------------------------------------------------------------
diff --git a/include/mesos/scheduler/scheduler.proto b/include/mesos/scheduler/scheduler.proto
index 5a94884..7a77fe1 100644
--- a/include/mesos/scheduler/scheduler.proto
+++ b/include/mesos/scheduler/scheduler.proto
@@ -71,6 +71,8 @@ message Event {
required bytes data = 3;
}
+ // TODO(vinod): Consider splitting the lost slave and terminated
+ // executor into separate events.
message Failure {
optional SlaveID slave_id = 1;
@@ -122,6 +124,7 @@ message Call {
ACKNOWLEDGE = 9;
RECONCILE = 10;
MESSAGE = 11;
+ SHUTDOWN = 13;
// TODO(benh): Consider adding an 'ACTIVATE' and 'DEACTIVATE' for
// already registered frameworks as a way of stopping offers from
@@ -169,6 +172,18 @@ message Call {
optional SlaveID slave_id = 2;
}
+ // Shuts down a custom executor. When the executor gets a shutdown
+ // event, it is expected to kill all its tasks (and send TASK_KILLED
+ // updates) and terminate. If the executor doesn’t terminate within
+ // a certain timeout (configurable via
+ // '--executor_shutdown_grace_period' slave flag), the slave will
+ // forcefully destroy the container (executor and its tasks) and
+ // transition its active tasks to TASK_LOST.
+ message Shutdown {
+ required ExecutorID executor_id = 1;
+ required SlaveID slave_id = 2;
+ }
+
message Acknowledge {
required SlaveID slave_id = 1;
required TaskID task_id = 2;
@@ -212,4 +227,5 @@ message Call {
optional Acknowledge acknowledge = 7;
optional Reconcile reconcile = 8;
optional Message message = 9;
+ optional Shutdown shutdown = 11;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/2d447e76/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index cc20be9..d443c80 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1611,6 +1611,13 @@ void Master::receive(
reconcile(framework, call.reconcile());
break;
+ case scheduler::Call::SHUTDOWN:
+ if (!call.has_shutdown()) {
+ drop(from, call, "Expecting 'shutdown' to be present");
+ }
+ shutdown(framework, call.shutdown());
+ break;
+
case scheduler::Call::KILL:
if (!call.has_kill()) {
drop(from, call, "Expecting 'kill' to be present");
@@ -3484,13 +3491,52 @@ void Master::exitedExecutor(
LOG(INFO) << "Executor " << executorId
<< " of framework " << frameworkId
- << " on slave " << *slave << " "
+ << " on slave " << *slave << ": "
<< WSTRINGIFY(status);
removeExecutor(slave, frameworkId, executorId);
- // TODO(benh): Send the framework its executor's exit status?
- // Or maybe at least have something like Scheduler::executorLost?
+ // TODO(vinod): Reliably forward this message to the scheduler.
+ Framework* framework = getFramework(frameworkId);
+ if (framework == NULL) {
+ LOG(WARNING)
+ << "Not forwarding exited executor message for executor '" << executorId
+ << "' of framework " << frameworkId << " on slave " << *slave
+ << " because the framework is unknown";
+
+ return;
+ }
+
+ ExitedExecutorMessage message;
+ message.mutable_executor_id()->CopyFrom(executorId);
+ message.mutable_framework_id()->CopyFrom(frameworkId);
+ message.mutable_slave_id()->CopyFrom(slaveId);
+ message.set_status(status);
+
+ send(framework->pid, message);
+}
+
+
+void Master::shutdown(
+ Framework* framework,
+ const scheduler::Call::Shutdown& shutdown)
+{
+ CHECK_NOTNULL(framework);
+
+ if (!slaves.registered.contains(shutdown.slave_id())) {
+ LOG(WARNING) << "Unable to shutdown executor '" << shutdown.executor_id()
+ << "' of framework " << framework->id()
+ << " of unknown slave " << shutdown.slave_id();
+ return;
+ }
+
+ Slave* slave = slaves.registered[shutdown.slave_id()];
+ CHECK_NOTNULL(slave);
+
+ ShutdownExecutorMessage message;
+ message.mutable_executor_id()->CopyFrom(shutdown.executor_id());
+ message.mutable_framework_id()->CopyFrom(framework->id());
+ send(slave->pid, message);
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/2d447e76/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 5d14a53..bf1661a 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -461,6 +461,10 @@ private:
Framework* framework,
const scheduler::Call::Kill& kill);
+ void shutdown(
+ Framework* framework,
+ const scheduler::Call::Shutdown& shutdown);
+
bool elected() const
{
return leader.isSome() && leader.get() == info_;
http://git-wip-us.apache.org/repos/asf/mesos/blob/2d447e76/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index bdf474b..98d859f 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -309,9 +309,14 @@ message ShutdownFrameworkMessage {
}
-// Tells the executor to initiate a shut down by invoking
-// Executor::shutdown.
-message ShutdownExecutorMessage {}
+// Tells a slave (and consequently executor) to shutdown an executor.
+message ShutdownExecutorMessage {
+ // TODO(vinod): Make these fields required. These are made optional
+ // for backwards compatibility between 0.23.0 slave and pre 0.23.0
+ // executor driver.
+ optional ExecutorID executor_id = 1;
+ optional FrameworkID framework_id = 2;
+}
message UpdateFrameworkMessage {
http://git-wip-us.apache.org/repos/asf/mesos/blob/2d447e76/src/scheduler/scheduler.cpp
----------------------------------------------------------------------
diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp
index 2bbb221..7d53d51 100644
--- a/src/scheduler/scheduler.cpp
+++ b/src/scheduler/scheduler.cpp
@@ -292,6 +292,15 @@ public:
break;
}
+ case Call::SHUTDOWN: {
+ if (!call.has_shutdown()) {
+ drop(call, "Expecting 'shutdown' to be present");
+ return;
+ }
+ send(master.get(), call);
+ break;
+ }
+
case Call::ACKNOWLEDGE: {
if (!call.has_acknowledge()) {
drop(call, "Expecting 'acknowledge' to be present");
@@ -345,6 +354,7 @@ protected:
install<RescindResourceOfferMessage>(&MesosProcess::receive);
install<StatusUpdateMessage>(&MesosProcess::receive);
install<LostSlaveMessage>(&MesosProcess::receive);
+ install<ExitedExecutorMessage>(&MesosProcess::receive);
install<ExecutorToFrameworkMessage>(&MesosProcess::receive);
install<FrameworkErrorMessage>(&MesosProcess::receive);
@@ -657,6 +667,20 @@ protected:
receive(from, event);
}
+ void receive(const UPID& from, const ExitedExecutorMessage& message)
+ {
+ Event event;
+ event.set_type(Event::FAILURE);
+
+ Event::Failure* failure = event.mutable_failure();
+
+ failure->mutable_slave_id()->CopyFrom(message.slave_id());
+ failure->mutable_executor_id()->CopyFrom(message.executor_id());
+ failure->set_status(message.status());
+
+ receive(from, event);
+ }
+
void receive(const UPID& from, const ExecutorToFrameworkMessage& _message)
{
Event event;
http://git-wip-us.apache.org/repos/asf/mesos/blob/2d447e76/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index f68a005..e531283 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -382,6 +382,11 @@ void Slave::initialize()
&KillTaskMessage::framework_id,
&KillTaskMessage::task_id);
+ install<ShutdownExecutorMessage>(
+ &Slave::shutdownExecutor,
+ &ShutdownExecutorMessage::framework_id,
+ &ShutdownExecutorMessage::executor_id);
+
install<ShutdownFrameworkMessage>(
&Slave::shutdownFramework,
&ShutdownFrameworkMessage::framework_id);
@@ -1792,7 +1797,7 @@ void Slave::shutdownFramework(
if (executor->state == Executor::REGISTERING ||
executor->state == Executor::RUNNING) {
- shutdownExecutor(framework, executor);
+ _shutdownExecutor(framework, executor);
} else if (executor->state == Executor::TERMINATED) {
// NOTE: We call remove here to ensure we can remove an
// executor (of a terminating framework) that is terminated
@@ -2578,7 +2583,7 @@ void Slave::statusUpdate(StatusUpdate update, const UPID& pid)
<< " of framework " << update.framework_id()
<< " which is not allowed. Shutting down the executor";
- shutdownExecutor(framework, executor);
+ _shutdownExecutor(framework, executor);
return;
}
@@ -3273,6 +3278,7 @@ void Slave::executorTerminated(
// Only send ExitedExecutorMessage if it is not a Command
// Executor because the master doesn't store them; they are
// generated by the slave.
+ // TODO(vinod): Reliably forward this message to the master.
if (!executor->isCommandExecutor()) {
ExitedExecutorMessage message;
message.mutable_slave_id()->MergeFrom(info.id());
@@ -3455,7 +3461,76 @@ void _unmonitor(
}
-void Slave::shutdownExecutor(Framework* framework, Executor* executor)
+void Slave::shutdownExecutor(
+ const UPID& from,
+ const FrameworkID& frameworkId,
+ const ExecutorID& executorId)
+{
+ if (from && master != from) {
+ LOG(WARNING) << "Ignoring shutdown executor message for executor '"
+ << executorId << "' of framework " << frameworkId
+ << " from " << from << " because it is not from the"
+ << " registered master ("
+ << (master.isSome() ? stringify(master.get()) : "None") << ")";
+ return;
+ }
+
+ LOG(INFO) << "Asked to shut down executor '" << executorId
+ << "' of framework "<< frameworkId << " by " << from;
+
+ CHECK(state == RECOVERING || state == DISCONNECTED ||
+ state == RUNNING || state == TERMINATING)
+ << state;
+
+ if (state == RECOVERING || state == DISCONNECTED) {
+ LOG(WARNING) << "Ignoring shutdown executor message for executor '"
+ << executorId << "' of framework " << frameworkId
+ << " because the slave has not yet registered with the master";
+ return;
+ }
+
+ Framework* framework = getFramework(frameworkId);
+ if (framework == NULL) {
+ LOG(WARNING) << "Cannot shut down executor '" << executorId
+ << "' of unknown framework " << frameworkId;
+ return;
+ }
+
+ CHECK(framework->state == Framework::RUNNING ||
+ framework->state == Framework::TERMINATING)
+ << framework->state;
+
+ if (framework->state == Framework::TERMINATING) {
+ LOG(WARNING) << "Ignoring shutdown executor '" << executorId
+ << "' of framework " << frameworkId
+ << " because the framework is terminating";
+ return;
+ }
+
+ if (!framework->executors.contains(executorId)) {
+ LOG(WARNING) << "Ignoring shutdown of unknown executor '" << executorId
+ << "' of framework " << frameworkId;
+ }
+
+ Executor* executor = framework->executors[executorId];
+ CHECK(executor->state == Executor::REGISTERING ||
+ executor->state == Executor::RUNNING ||
+ executor->state == Executor::TERMINATING ||
+ executor->state == Executor::TERMINATED)
+ << executor->state;
+
+ if (executor->state == Executor::TERMINATING ||
+ executor->state == Executor::TERMINATED) {
+ LOG(WARNING) << "Ignoring shutdown executor '" << executorId
+ << "' of framework " << frameworkId
+ << " because the executor is terminating/terminated";
+ }
+
+ _shutdownExecutor(framework, executor);
+}
+
+
+void Slave::_shutdownExecutor(Framework* framework, Executor* executor)
{
CHECK_NOTNULL(framework);
CHECK_NOTNULL(executor);
@@ -3467,7 +3542,6 @@ void Slave::shutdownExecutor(Framework* framework, Executor* executor)
framework->state == Framework::TERMINATING)
<< framework->state;
-
CHECK(executor->state == Executor::REGISTERING ||
executor->state == Executor::RUNNING)
<< executor->state;
@@ -3774,7 +3848,7 @@ Future<Nothing> Slave::_recover()
<< "' of framework " << framework->id()
<< " to " << executor->pid;
- shutdownExecutor(framework, executor);
+ _shutdownExecutor(framework, executor);
} else {
LOG(INFO) << "Killing executor '" << executor->id
<< "' of framework " << framework->id()
http://git-wip-us.apache.org/repos/asf/mesos/blob/2d447e76/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index d214ddb..1b8c512 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -125,6 +125,11 @@ public:
const FrameworkID& frameworkId,
const TaskID& taskId);
+ void shutdownExecutor(
+ const process::UPID& from,
+ const FrameworkID& frameworkId,
+ const ExecutorID& executorId);
+
void shutdownFramework(
const process::UPID& from,
const FrameworkID& frameworkId);
@@ -240,13 +245,6 @@ public:
// and os calls.
void _checkDiskUsage(const process::Future<double>& usage);
- // Shut down an executor. This is a two phase process. First, an
- // executor receives a shut down message (shut down phase), then
- // after a configurable timeout the slave actually forces a kill
- // (kill phase, via the isolator) if the executor has not
- // exited.
- void shutdownExecutor(Framework* framework, Executor* executor);
-
// Invoked whenever the detector detects a change in masters.
// Made public for testing purposes.
void detected(const process::Future<Option<MasterInfo> >& pid);
@@ -299,13 +297,6 @@ public:
const FrameworkID& frameworkId,
const TaskInfo& task);
- // Handle the second phase of shutting down an executor for those
- // executors that have not properly shutdown within a timeout.
- void shutdownExecutorTimeout(
- const FrameworkID& frameworkId,
- const ExecutorID& executorId,
- const ContainerID& containerId);
-
// Shuts down the executor if it did not register yet.
void registerExecutorTimeout(
const FrameworkID& frameworkId,
@@ -360,6 +351,20 @@ private:
void _authenticate();
void authenticationTimeout(process::Future<bool> future);
+ // Shut down an executor. This is a two phase process. First, an
+ // executor receives a shut down message (shut down phase), then
+ // after a configurable timeout the slave actually forces a kill
+ // (kill phase, via the isolator) if the executor has not
+ // exited.
+ void _shutdownExecutor(Framework* framework, Executor* executor);
+
+ // Handle the second phase of shutting down an executor for those
+ // executors that have not properly shutdown within a timeout.
+ void shutdownExecutorTimeout(
+ const FrameworkID& frameworkId,
+ const ExecutorID& executorId,
+ const ContainerID& containerId);
+
// Inner class used to namespace HTTP route handlers (see
// slave/http.cpp for implementations).
class Http
http://git-wip-us.apache.org/repos/asf/mesos/blob/2d447e76/src/tests/scheduler_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/scheduler_tests.cpp b/src/tests/scheduler_tests.cpp
index a1e49af..a59b146 100644
--- a/src/tests/scheduler_tests.cpp
+++ b/src/tests/scheduler_tests.cpp
@@ -24,6 +24,7 @@
#include <mesos/executor.hpp>
#include <mesos/scheduler.hpp>
+#include <mesos/type_utils.hpp>
#include <process/clock.hpp>
#include <process/future.hpp>
@@ -108,14 +109,14 @@ ACTION_P(Enqueue, queue)
TEST_F(SchedulerTest, TaskRunning)
{
- Try<PID<Master> > master = StartMaster();
+ Try<PID<Master>> master = StartMaster();
ASSERT_SOME(master);
MockExecutor exec(DEFAULT_EXECUTOR_ID);
TestContainerizer containerizer(&exec);
- Try<PID<Slave> > slave = StartSlave(&containerizer);
+ Try<PID<Slave>> slave = StartSlave(&containerizer);
ASSERT_SOME(slave);
Callbacks callbacks;
@@ -213,14 +214,14 @@ TEST_F(SchedulerTest, TaskRunning)
TEST_F(SchedulerTest, ReconcileTask)
{
- Try<PID<Master> > master = StartMaster();
+ Try<PID<Master>> master = StartMaster();
ASSERT_SOME(master);
MockExecutor exec(DEFAULT_EXECUTOR_ID);
TestContainerizer containerizer(&exec);
- Try<PID<Slave> > slave = StartSlave(&containerizer);
+ Try<PID<Slave>> slave = StartSlave(&containerizer);
ASSERT_SOME(slave);
Callbacks callbacks;
@@ -442,6 +443,118 @@ TEST_F(SchedulerTest, KillTask)
}
+TEST_F(SchedulerTest, ShutdownExecutor)
+{
+ Try<PID<Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+ TestContainerizer containerizer(&exec);
+
+ Try<PID<Slave>> slave = StartSlave(&containerizer);
+ ASSERT_SOME(slave);
+
+ Callbacks callbacks;
+
+ Future<Nothing> connected;
+ EXPECT_CALL(callbacks, connected())
+ .WillOnce(FutureSatisfy(&connected));
+
+ scheduler::Mesos mesos(
+ master.get(),
+ DEFAULT_CREDENTIAL,
+ lambda::bind(&Callbacks::connected, lambda::ref(callbacks)),
+ lambda::bind(&Callbacks::disconnected, lambda::ref(callbacks)),
+ lambda::bind(&Callbacks::received, lambda::ref(callbacks), lambda::_1));
+
+ AWAIT_READY(connected);
+
+ Queue<Event> events;
+
+ EXPECT_CALL(callbacks, received(_))
+ .WillRepeatedly(Enqueue(&events));
+
+ {
+ Call call;
+ call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
+ call.set_type(Call::REGISTER);
+
+ mesos.send(call);
+ }
+
+ Future<Event> event = events.get();
+ AWAIT_READY(event);
+ EXPECT_EQ(Event::REGISTERED, event.get().type());
+
+ FrameworkID id(event.get().registered().framework_id());
+
+ event = events.get();
+ AWAIT_READY(event);
+ EXPECT_EQ(Event::OFFERS, event.get().type());
+ EXPECT_NE(0, event.get().offers().offers().size());
+
+ EXPECT_CALL(exec, registered(_, _, _, _))
+ .Times(1);
+
+ EXPECT_CALL(exec, launchTask(_, _))
+ .WillOnce(SendStatusUpdateFromTask(TASK_FINISHED));
+
+ Offer offer = event.get().offers().offers(0);
+ TaskInfo taskInfo = createTask(offer,"", DEFAULT_EXECUTOR_ID);
+
+ {
+ Call call;
+ call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
+ call.mutable_framework_info()->mutable_id()->CopyFrom(id);
+ call.set_type(Call::ACCEPT);
+
+ Call::Accept* accept = call.mutable_accept();
+ accept->add_offer_ids()->CopyFrom(offer.id());
+
+ Offer::Operation* operation = accept->add_operations();
+ operation->set_type(Offer::Operation::LAUNCH);
+ operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
+
+ mesos.send(call);
+ }
+
+ event = events.get();
+ AWAIT_READY(event);
+ EXPECT_EQ(Event::UPDATE, event.get().type());
+ EXPECT_EQ(TASK_FINISHED, event.get().update().status().state());
+
+ Future<Nothing> shutdown;
+ EXPECT_CALL(exec, shutdown(_))
+ .WillOnce(FutureSatisfy(&shutdown));
+
+ {
+ Call call;
+ call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
+ call.mutable_framework_info()->mutable_id()->CopyFrom(id);
+ call.set_type(Call::SHUTDOWN);
+
+ Call::Shutdown* shutdown = call.mutable_shutdown();
+ shutdown->mutable_executor_id()->CopyFrom(DEFAULT_EXECUTOR_ID);
+ shutdown->mutable_slave_id()->CopyFrom(offer.slave_id());
+
+ mesos.send(call);
+ }
+
+ AWAIT_READY(shutdown);
+ containerizer.destroy(id, DEFAULT_EXECUTOR_ID);
+
+ // Executor termination results in a 'FAILURE' event.
+ event = events.get();
+ AWAIT_READY(event);
+ EXPECT_EQ(Event::FAILURE, event.get().type());
+ ExecutorID executorId(DEFAULT_EXECUTOR_ID);
+ EXPECT_EQ(executorId, event.get().failure().executor_id());
+
+ Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
+}
+
+
// TODO(benh): Write test for sending Call::Acknowledgement through
// master to slave when Event::Update was generated locally.
@@ -451,7 +564,7 @@ class MesosSchedulerDriverTest : public MesosTest {};
TEST_F(MesosSchedulerDriverTest, MetricsEndpoint)
{
- Try<PID<Master> > master = StartMaster();
+ Try<PID<Master>> master = StartMaster();
ASSERT_SOME(master);
MockScheduler sched;
@@ -503,12 +616,12 @@ ACTION(StopAndAbort)
// abort(), no pending acknowledgements are sent.
TEST_F(MesosSchedulerDriverTest, DropAckIfStopCalledBeforeAbort)
{
- Try<PID<Master> > master = StartMaster();
+ Try<PID<Master>> master = StartMaster();
ASSERT_SOME(master);
MockExecutor exec(DEFAULT_EXECUTOR_ID);
TestContainerizer containerizer(&exec);
- Try<PID<Slave> > slave = StartSlave(&containerizer);
+ Try<PID<Slave>> slave = StartSlave(&containerizer);
ASSERT_SOME(slave);
MockScheduler sched;
@@ -562,12 +675,12 @@ TEST_F(MesosSchedulerDriverTest, DropAckIfStopCalledBeforeAbort)
// the call to 'acknowledgeStatusUpdate' sends the ack to the master.
TEST_F(MesosSchedulerDriverTest, ExplicitAcknowledgements)
{
- Try<PID<Master> > master = StartMaster();
+ Try<PID<Master>> master = StartMaster();
ASSERT_SOME(master);
MockExecutor exec(DEFAULT_EXECUTOR_ID);
TestContainerizer containerizer(&exec);
- Try<PID<Slave> > slave = StartSlave(&containerizer);
+ Try<PID<Slave>> slave = StartSlave(&containerizer);
ASSERT_SOME(slave);
MockScheduler sched;
@@ -628,10 +741,10 @@ TEST_F(MesosSchedulerDriverTest, ExplicitAcknowledgements)
// resources.
TEST_F(MesosSchedulerDriverTest, ExplicitAcknowledgementsMasterGeneratedUpdate)
{
- Try<PID<Master> > master = StartMaster();
+ Try<PID<Master>> master = StartMaster();
ASSERT_SOME(master);
- Try<PID<Slave> > slave = StartSlave();
+ Try<PID<Slave>> slave = StartSlave();
ASSERT_SOME(slave);
MockScheduler sched;
@@ -696,7 +809,7 @@ TEST_F(MesosSchedulerDriverTest, ExplicitAcknowledgementsMasterGeneratedUpdate)
// generate a status with no slave id by performing reconciliation.
TEST_F(MesosSchedulerDriverTest, ExplicitAcknowledgementsUnsetSlaveID)
{
- Try<PID<Master> > master = StartMaster();
+ Try<PID<Master>> master = StartMaster();
ASSERT_SOME(master);
MockScheduler sched;
[05/11] mesos git commit: Removed MasterInfo from REGISTER and
REREGISTER scheduler calls.
Posted by vi...@apache.org.
Removed MasterInfo from REGISTER and REREGISTER scheduler calls.
Review: https://reviews.apache.org/r/32504
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d3c32697
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d3c32697
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d3c32697
Branch: refs/heads/master
Commit: d3c3269731b60b9b1232a3aa9d3b0d61742413d3
Parents: f95fa11
Author: Vinod Kone <vi...@gmail.com>
Authored: Fri Mar 20 17:14:28 2015 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Sat Apr 25 11:32:47 2015 -1000
----------------------------------------------------------------------
include/mesos/scheduler/scheduler.proto | 2 --
src/examples/low_level_scheduler_libprocess.cpp | 6 ++----
src/examples/low_level_scheduler_pthread.cpp | 6 ++----
src/scheduler/scheduler.cpp | 2 --
4 files changed, 4 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/d3c32697/include/mesos/scheduler/scheduler.proto
----------------------------------------------------------------------
diff --git a/include/mesos/scheduler/scheduler.proto b/include/mesos/scheduler/scheduler.proto
index 51bfe8d..f347912 100644
--- a/include/mesos/scheduler/scheduler.proto
+++ b/include/mesos/scheduler/scheduler.proto
@@ -46,12 +46,10 @@ message Event {
message Registered {
required FrameworkID framework_id = 1;
- required MasterInfo master_info = 2;
}
message Reregistered {
required FrameworkID framework_id = 1;
- required MasterInfo master_info = 2;
}
message Offers {
http://git-wip-us.apache.org/repos/asf/mesos/blob/d3c32697/src/examples/low_level_scheduler_libprocess.cpp
----------------------------------------------------------------------
diff --git a/src/examples/low_level_scheduler_libprocess.cpp b/src/examples/low_level_scheduler_libprocess.cpp
index 2a388fe..fbfb0f7 100644
--- a/src/examples/low_level_scheduler_libprocess.cpp
+++ b/src/examples/low_level_scheduler_libprocess.cpp
@@ -124,8 +124,7 @@ public:
state = REGISTERED;
cout << "Framework '" << event.registered().framework_id().value()
- << "' registered with Master '"
- << event.registered().master_info().id() << "'" << endl;
+ << "' registered" << endl;
break;
}
@@ -135,8 +134,7 @@ public:
state = REGISTERED;
cout << "Framework '" << event.reregistered().framework_id().value()
- << "' re-registered to Master '"
- << event.reregistered().master_info().id() << "'" << endl;
+ << "' re-registered" << endl;
break;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/d3c32697/src/examples/low_level_scheduler_pthread.cpp
----------------------------------------------------------------------
diff --git a/src/examples/low_level_scheduler_pthread.cpp b/src/examples/low_level_scheduler_pthread.cpp
index 063b7f6..944573d 100644
--- a/src/examples/low_level_scheduler_pthread.cpp
+++ b/src/examples/low_level_scheduler_pthread.cpp
@@ -145,8 +145,7 @@ public:
framework.mutable_id()->CopyFrom(event.registered().framework_id());
cout << "Framework '" << event.registered().framework_id().value()
- << "' registered with Master '"
- << event.registered().master_info().id() << "'" << endl;
+ << "' registered" << endl;
break;
}
@@ -158,8 +157,7 @@ public:
pthread_mutex_unlock(&mutex);
cout << "Framework '" << event.reregistered().framework_id().value()
- << "' re-registered with Master '"
- << event.reregistered().master_info().id() << "'" << endl;
+ << "' re-registered" << endl;
break;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/d3c32697/src/scheduler/scheduler.cpp
----------------------------------------------------------------------
diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp
index d417442..8f0f374 100644
--- a/src/scheduler/scheduler.cpp
+++ b/src/scheduler/scheduler.cpp
@@ -576,7 +576,6 @@ protected:
Event::Registered* registered = event.mutable_registered();
registered->mutable_framework_id()->CopyFrom(message.framework_id());
- registered->mutable_master_info()->CopyFrom(message.master_info());
receive(from, event);
}
@@ -594,7 +593,6 @@ protected:
Event::Reregistered* reregistered = event.mutable_reregistered();
reregistered->mutable_framework_id()->CopyFrom(message.framework_id());
- reregistered->mutable_master_info()->CopyFrom(message.master_info());
receive(from, event);
}
[08/11] mesos git commit: Renamed UNREGISTER call to TEARDOWN.
Posted by vi...@apache.org.
Renamed UNREGISTER call to TEARDOWN.
Review: https://reviews.apache.org/r/32845
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c3de1e8e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c3de1e8e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c3de1e8e
Branch: refs/heads/master
Commit: c3de1e8ec9cf0f4c228eaca71c050e4735712c08
Parents: eb3c958
Author: Vinod Kone <vi...@gmail.com>
Authored: Fri Apr 3 16:19:14 2015 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Sat Apr 25 11:32:48 2015 -1000
----------------------------------------------------------------------
include/mesos/scheduler/scheduler.proto | 4 +-
src/examples/low_level_scheduler_libprocess.cpp | 2 +-
src/examples/low_level_scheduler_pthread.cpp | 2 +-
src/master/master.cpp | 5 +-
src/scheduler/scheduler.cpp | 6 +-
src/tests/scheduler_tests.cpp | 100 +++++++++++++++++++
6 files changed, 110 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/c3de1e8e/include/mesos/scheduler/scheduler.proto
----------------------------------------------------------------------
diff --git a/include/mesos/scheduler/scheduler.proto b/include/mesos/scheduler/scheduler.proto
index aea5607..ec9adf6 100644
--- a/include/mesos/scheduler/scheduler.proto
+++ b/include/mesos/scheduler/scheduler.proto
@@ -110,7 +110,7 @@ message Call {
// applicable.
enum Type {
SUBSCRIBE = 1; // See 'framework_info' below.
- UNREGISTER = 3;
+ TEARDOWN = 3; // Shuts down all tasks and executors.
REVIVE = 6;
DECLINE = 5;
ACCEPT = 12;
@@ -208,7 +208,7 @@ message Call {
// Identifies who generated this call. Always necessary, but the
// only thing that needs to be set for certain calls, e.g.,
- // SUBSCRIBE and UNREGISTER. 'framework_info.id()' must be always
+ // SUBSCRIBE and TEARDOWN. 'framework_info.id()' must be always
// set except when a brand new scheduler SUBSCRIBEs for the very
// first time.
required FrameworkInfo framework_info = 1;
http://git-wip-us.apache.org/repos/asf/mesos/blob/c3de1e8e/src/examples/low_level_scheduler_libprocess.cpp
----------------------------------------------------------------------
diff --git a/src/examples/low_level_scheduler_libprocess.cpp b/src/examples/low_level_scheduler_libprocess.cpp
index cf85c76..b55ad60 100644
--- a/src/examples/low_level_scheduler_libprocess.cpp
+++ b/src/examples/low_level_scheduler_libprocess.cpp
@@ -311,7 +311,7 @@ private:
{
Call call;
call.mutable_framework_info()->CopyFrom(framework);
- call.set_type(Call::UNREGISTER);
+ call.set_type(Call::TEARDOWN);
mesos.send(call);
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/c3de1e8e/src/examples/low_level_scheduler_pthread.cpp
----------------------------------------------------------------------
diff --git a/src/examples/low_level_scheduler_pthread.cpp b/src/examples/low_level_scheduler_pthread.cpp
index 4af576d..64a0e44 100644
--- a/src/examples/low_level_scheduler_pthread.cpp
+++ b/src/examples/low_level_scheduler_pthread.cpp
@@ -358,7 +358,7 @@ private:
void finalize()
{
Call call;
- call.set_type(Call::UNREGISTER);
+ call.set_type(Call::TEARDOWN);
call.mutable_framework_info()->CopyFrom(framework);
mesos.send(call);
http://git-wip-us.apache.org/repos/asf/mesos/blob/c3de1e8e/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 502d3ba..ce9d263 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1588,7 +1588,6 @@ void Master::receive(
// framework id is set and non-empty except for SUBSCRIBE call.
switch (call.type()) {
- case scheduler::Call::UNREGISTER:
case scheduler::Call::REVIVE:
case scheduler::Call::DECLINE:
drop(from, call, "Unimplemented");
@@ -1629,6 +1628,10 @@ void Master::receive(
drop(from, call, "Unimplemented");
break;
+ case scheduler::Call::TEARDOWN:
+ removeFramework(framework);
+ break;
+
default:
drop(from, call, "Unknown call type");
break;
http://git-wip-us.apache.org/repos/asf/mesos/blob/c3de1e8e/src/scheduler/scheduler.cpp
----------------------------------------------------------------------
diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp
index 82cbfcb..2047ee4 100644
--- a/src/scheduler/scheduler.cpp
+++ b/src/scheduler/scheduler.cpp
@@ -229,10 +229,8 @@ public:
break;
}
- case Call::UNREGISTER: {
- UnregisterFrameworkMessage message;
- message.mutable_framework_id()->CopyFrom(call.framework_info().id());
- send(master.get(), message);
+ case Call::TEARDOWN: {
+ send(master.get(), call);
break;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/c3de1e8e/src/tests/scheduler_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/scheduler_tests.cpp b/src/tests/scheduler_tests.cpp
index ddbb712..54d6bc9 100644
--- a/src/tests/scheduler_tests.cpp
+++ b/src/tests/scheduler_tests.cpp
@@ -555,6 +555,106 @@ TEST_F(SchedulerTest, ShutdownExecutor)
}
+TEST_F(SchedulerTest, Teardown)
+{
+ Try<PID<Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+ TestContainerizer containerizer(&exec);
+
+ Try<PID<Slave>> slave = StartSlave(&containerizer);
+ ASSERT_SOME(slave);
+
+ Callbacks callbacks;
+
+ Future<Nothing> connected;
+ EXPECT_CALL(callbacks, connected())
+ .WillOnce(FutureSatisfy(&connected));
+
+ scheduler::Mesos mesos(
+ master.get(),
+ DEFAULT_CREDENTIAL,
+ lambda::bind(&Callbacks::connected, lambda::ref(callbacks)),
+ lambda::bind(&Callbacks::disconnected, lambda::ref(callbacks)),
+ lambda::bind(&Callbacks::received, lambda::ref(callbacks), lambda::_1));
+
+ AWAIT_READY(connected);
+
+ Queue<Event> events;
+
+ EXPECT_CALL(callbacks, received(_))
+ .WillRepeatedly(Enqueue(&events));
+
+ {
+ Call call;
+ call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
+ call.set_type(Call::SUBSCRIBE);
+
+ mesos.send(call);
+ }
+
+ Future<Event> event = events.get();
+ AWAIT_READY(event);
+ EXPECT_EQ(Event::SUBSCRIBED, event.get().type());
+
+ FrameworkID id(event.get().subscribed().framework_id());
+
+ event = events.get();
+ AWAIT_READY(event);
+ EXPECT_EQ(Event::OFFERS, event.get().type());
+ EXPECT_NE(0, event.get().offers().offers().size());
+
+ EXPECT_CALL(exec, registered(_, _, _, _))
+ .Times(1);
+
+ EXPECT_CALL(exec, launchTask(_, _))
+ .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+ Offer offer = event.get().offers().offers(0);
+ TaskInfo taskInfo = createTask(offer,"", DEFAULT_EXECUTOR_ID);
+
+ {
+ Call call;
+ call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
+ call.mutable_framework_info()->mutable_id()->CopyFrom(id);
+ call.set_type(Call::ACCEPT);
+
+ Call::Accept* accept = call.mutable_accept();
+ accept->add_offer_ids()->CopyFrom(offer.id());
+
+ Offer::Operation* operation = accept->add_operations();
+ operation->set_type(Offer::Operation::LAUNCH);
+ operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
+
+ mesos.send(call);
+ }
+
+ event = events.get();
+ AWAIT_READY(event);
+ EXPECT_EQ(Event::UPDATE, event.get().type());
+ EXPECT_EQ(TASK_RUNNING, event.get().update().status().state());
+
+ Future<Nothing> shutdown;
+ EXPECT_CALL(exec, shutdown(_))
+ .WillOnce(FutureSatisfy(&shutdown));
+
+ {
+ Call call;
+ call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
+ call.mutable_framework_info()->mutable_id()->CopyFrom(id);
+ call.set_type(Call::TEARDOWN);
+
+ mesos.send(call);
+ }
+
+ AWAIT_READY(shutdown);
+
+ Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
+}
+
+
// TODO(benh): Write test for sending Call::Acknowledgement through
// master to slave when Event::Update was generated locally.
[09/11] mesos git commit: Added output stream operators for scheduler
Calls and Events.
Posted by vi...@apache.org.
Added output stream operators for scheduler Calls and Events.
Review: https://reviews.apache.org/r/32506
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/94cb038f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/94cb038f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/94cb038f
Branch: refs/heads/master
Commit: 94cb038f13d4793812fd0ae0eeb9c68ae2e4d152
Parents: 2d447e7
Author: Vinod Kone <vi...@gmail.com>
Authored: Wed Mar 25 11:36:52 2015 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Sat Apr 25 11:32:48 2015 -1000
----------------------------------------------------------------------
include/mesos/type_utils.hpp | 22 +++++++++++++++++++++-
src/master/master.cpp | 8 ++++----
2 files changed, 25 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/94cb038f/include/mesos/type_utils.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/type_utils.hpp b/include/mesos/type_utils.hpp
index cdf5864..0446374 100644
--- a/include/mesos/type_utils.hpp
+++ b/include/mesos/type_utils.hpp
@@ -19,12 +19,16 @@
#ifndef __MESOS_TYPE_UTILS_H__
#define __MESOS_TYPE_UTILS_H__
+#include <ostream>
+
#include <boost/functional/hash.hpp>
#include <mesos/mesos.hpp>
#include <mesos/module/module.hpp>
+#include <mesos/scheduler/scheduler.hpp>
+
#include <stout/uuid.hpp>
// This file includes definitions for operators on public protobuf
@@ -338,7 +342,23 @@ inline std::ostream& operator << (
std::ostream& stream,
const TaskState& state)
{
- return stream << TaskState_descriptor()->FindValueByNumber(state)->name();
+ return stream << TaskState_Name(state);
+}
+
+
+inline std::ostream& operator << (
+ std::ostream& stream,
+ const scheduler::Call::Type& type)
+{
+ return stream << scheduler::Call_Type_Name(type);
+}
+
+
+inline std::ostream& operator << (
+ std::ostream& stream,
+ const scheduler::Event::Type& type)
+{
+ return stream << scheduler::Event_Type_Name(type);
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/94cb038f/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index d443c80..2c161a9 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1526,10 +1526,10 @@ void Master::drop(
{
// TODO(bmahler): Increment a metric.
- LOG(ERROR) << "Dropping " << scheduler::Call::Type_Name(call.type())
- << " call from framework " << call.framework_info().id()
- << " (" << call.framework_info().name() << ") at " << from
- << ": " << message;
+ LOG(ERROR) << "Dropping " << call.type() << " call"
+ << " from framework " << call.framework_info().id()
+ << " (" << call.framework_info().name()
+ << ") at " << from << ": " << message;
}
[02/11] mesos git commit: Removed LAUNCH call from scheduler.proto.
Posted by vi...@apache.org.
Removed LAUNCH call from scheduler.proto.
Review: https://reviews.apache.org/r/32500
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/79086eb1
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/79086eb1
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/79086eb1
Branch: refs/heads/master
Commit: 79086eb1e5c470d4b8e425ab5e26061fc84df14b
Parents: ec0a9f3
Author: Vinod Kone <vi...@gmail.com>
Authored: Fri Mar 20 15:57:42 2015 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Sat Apr 25 11:32:46 2015 -1000
----------------------------------------------------------------------
include/mesos/scheduler/scheduler.proto | 14 ++----
src/examples/low_level_scheduler_libprocess.cpp | 11 +++--
src/examples/low_level_scheduler_pthread.cpp | 11 +++--
src/master/master.cpp | 3 +-
src/scheduler/scheduler.cpp | 50 +-------------------
src/tests/scheduler_tests.cpp | 12 +++--
6 files changed, 29 insertions(+), 72 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/79086eb1/include/mesos/scheduler/scheduler.proto
----------------------------------------------------------------------
diff --git a/include/mesos/scheduler/scheduler.proto b/include/mesos/scheduler/scheduler.proto
index 783a63a..ce401aa 100644
--- a/include/mesos/scheduler/scheduler.proto
+++ b/include/mesos/scheduler/scheduler.proto
@@ -121,7 +121,6 @@ message Call {
REVIVE = 6;
DECLINE = 5;
ACCEPT = 12;
- LAUNCH = 7;
KILL = 8;
ACKNOWLEDGE = 9;
RECONCILE = 10;
@@ -162,20 +161,16 @@ message Call {
// ]
// }
//
- // TODO(bmahler): Not implemented.
+ // Note that any of the offer’s resources not used in the 'Accept'
+ // call (e.g., to launch a task) are considered unused and might be
+ // reoffered to other frameworks. In other words, the same OfferID
+ // cannot be used in more than one 'Accept' call.
message Accept {
repeated OfferID offer_ids = 1;
repeated Offer.Operation operations = 2;
optional Filters filters = 3;
}
- // TODO(bmahler): Deprecate Launch in favor of Accept.
- message Launch {
- repeated TaskInfo task_infos = 1;
- repeated OfferID offer_ids = 2;
- optional Filters filters = 3;
- }
-
message Kill {
required TaskID task_id = 1;
}
@@ -216,7 +211,6 @@ message Call {
optional Request request = 3;
optional Decline decline = 4;
optional Accept accept = 10;
- optional Launch launch = 5;
optional Kill kill = 6;
optional Acknowledge acknowledge = 7;
optional Reconcile reconcile = 8;
http://git-wip-us.apache.org/repos/asf/mesos/blob/79086eb1/src/examples/low_level_scheduler_libprocess.cpp
----------------------------------------------------------------------
diff --git a/src/examples/low_level_scheduler_libprocess.cpp b/src/examples/low_level_scheduler_libprocess.cpp
index 63d34ee..2a388fe 100644
--- a/src/examples/low_level_scheduler_libprocess.cpp
+++ b/src/examples/low_level_scheduler_libprocess.cpp
@@ -249,13 +249,16 @@ private:
Call call;
call.mutable_framework_info()->CopyFrom(framework);
- call.set_type(Call::LAUNCH);
+ call.set_type(Call::ACCEPT);
- Call::Launch* launch = call.mutable_launch();
+ Call::Accept* accept = call.mutable_accept();
+ accept->add_offer_ids()->CopyFrom(offer.id());
+
+ Offer::Operation* operation = accept->add_operations();
+ operation->set_type(Offer::Operation::LAUNCH);
foreach (const TaskInfo& taskInfo, tasks) {
- launch->add_task_infos()->CopyFrom(taskInfo);
+ operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
}
- launch->add_offer_ids()->CopyFrom(offer.id());
mesos.send(call);
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/79086eb1/src/examples/low_level_scheduler_pthread.cpp
----------------------------------------------------------------------
diff --git a/src/examples/low_level_scheduler_pthread.cpp b/src/examples/low_level_scheduler_pthread.cpp
index 6d1f938..063b7f6 100644
--- a/src/examples/low_level_scheduler_pthread.cpp
+++ b/src/examples/low_level_scheduler_pthread.cpp
@@ -298,14 +298,17 @@ private:
}
Call call;
- call.set_type(Call::LAUNCH);
call.mutable_framework_info()->CopyFrom(framework);
+ call.set_type(Call::ACCEPT);
- Call::Launch* launch = call.mutable_launch();
+ Call::Accept* accept = call.mutable_accept();
+ accept->add_offer_ids()->CopyFrom(offer.id());
+
+ Offer::Operation* operation = accept->add_operations();
+ operation->set_type(Offer::Operation::LAUNCH);
foreach (const TaskInfo& taskInfo, tasks) {
- launch->add_task_infos()->CopyFrom(taskInfo);
+ operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
}
- launch->add_offer_ids()->CopyFrom(offer.id());
mesos.send(call);
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/79086eb1/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index f3462d1..865ff89 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1554,6 +1554,8 @@ void Master::receive(
const UPID& from,
const scheduler::Call& call)
{
+ // TODO(vinod): Add metrics for calls.
+
const FrameworkInfo& frameworkInfo = call.framework_info();
// For REGISTER and REREGISTER calls, no need to look up the
@@ -1602,7 +1604,6 @@ void Master::receive(
accept(framework, call.accept());
break;
- case scheduler::Call::LAUNCH:
case scheduler::Call::KILL:
case scheduler::Call::ACKNOWLEDGE:
case scheduler::Call::RECONCILE:
http://git-wip-us.apache.org/repos/asf/mesos/blob/79086eb1/src/scheduler/scheduler.cpp
----------------------------------------------------------------------
diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp
index bd9fced..6fbd991 100644
--- a/src/scheduler/scheduler.cpp
+++ b/src/scheduler/scheduler.cpp
@@ -295,33 +295,6 @@ public:
break;
}
- case Call::LAUNCH: {
- if (!call.has_launch()) {
- drop(call, "Expecting 'launch' to be present");
- return;
- }
- // We do some local validation here, but really this should
- // all happen in the master so it's only implemented once.
- foreach (TaskInfo& task,
- *call.mutable_launch()->mutable_task_infos()) {
- // Set ExecutorInfo::framework_id if missing since this
- // field was added to the API later and thus was made
- // optional.
- if (task.has_executor() && !task.executor().has_framework_id()) {
- task.mutable_executor()->mutable_framework_id()->CopyFrom(
- call.framework_info().id());
- }
- }
-
- LaunchTasksMessage message;
- message.mutable_framework_id()->CopyFrom(call.framework_info().id());
- message.mutable_filters()->CopyFrom(call.launch().filters());
- message.mutable_offer_ids()->CopyFrom(call.launch().offer_ids());
- message.mutable_tasks()->CopyFrom(call.launch().task_infos());
- send(master.get(), message);
- break;
- }
-
case Call::KILL: {
if (!call.has_kill()) {
drop(call, "Expecting 'kill' to be present");
@@ -766,28 +739,7 @@ protected:
void drop(const Call& call, const string& message)
{
- VLOG(1) << "Dropping " << stringify(call.type()) << ": " << message;
-
- switch (call.type()) {
- case Call::LAUNCH: {
- // We drop the tasks preemptively (enqueing update events that
- // put the task in TASK_LOST). This is a hack for now, to keep
- // the tasks from being forever in PENDING state, when
- // actually the master never received the launch.
- // Unfortuantely this is insufficient since it doesn't capture
- // the case when the scheduler process sends it but the master
- // never receives it (i.e., during a master failover). In the
- // future, this should be solved by higher-level abstractions
- // and this hack should be considered for removal.
- foreach (const TaskInfo& task, call.launch().task_infos()) {
- drop(task, message);
- }
- break;
- }
-
- default:
- break;
- }
+ LOG(WARNING) << "Dropping " << call.type() << ": " << message;
}
private:
http://git-wip-us.apache.org/repos/asf/mesos/blob/79086eb1/src/tests/scheduler_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/scheduler_tests.cpp b/src/tests/scheduler_tests.cpp
index 4a89a7a..4ea5528 100644
--- a/src/tests/scheduler_tests.cpp
+++ b/src/tests/scheduler_tests.cpp
@@ -185,10 +185,14 @@ TEST_F(SchedulerTest, TaskRunning)
Call call;
call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
call.mutable_framework_info()->mutable_id()->CopyFrom(id);
- call.set_type(Call::LAUNCH);
- call.mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
- call.mutable_launch()->add_offer_ids()->CopyFrom(
- event.get().offers().offers(0).id());
+ call.set_type(Call::ACCEPT);
+
+ Call::Accept* accept = call.mutable_accept();
+ accept->add_offer_ids()->CopyFrom(event.get().offers().offers(0).id());
+
+ Offer::Operation* operation = accept->add_operations();
+ operation->set_type(Offer::Operation::LAUNCH);
+ operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
mesos.send(call);
}
[10/11] mesos git commit: Added SUBSCRIBE call and SUBSCRIBED event.
Posted by vi...@apache.org.
Added SUBSCRIBE call and SUBSCRIBED event.
Review: https://reviews.apache.org/r/32844
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/eb3c958e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/eb3c958e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/eb3c958e
Branch: refs/heads/master
Commit: eb3c958e3932a823115a502c6b0e57a29b54bf94
Parents: 94cb038
Author: Vinod Kone <vi...@gmail.com>
Authored: Fri Apr 3 14:35:28 2015 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Sat Apr 25 11:32:48 2015 -1000
----------------------------------------------------------------------
include/mesos/scheduler/scheduler.proto | 22 ++++----
src/examples/low_level_scheduler_libprocess.cpp | 28 +++-------
src/examples/low_level_scheduler_pthread.cpp | 28 +++-------
src/master/master.cpp | 13 +++--
src/scheduler/scheduler.cpp | 57 +++++++++-----------
src/tests/scheduler_tests.cpp | 24 ++++-----
6 files changed, 68 insertions(+), 104 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/eb3c958e/include/mesos/scheduler/scheduler.proto
----------------------------------------------------------------------
diff --git a/include/mesos/scheduler/scheduler.proto b/include/mesos/scheduler/scheduler.proto
index 7a77fe1..aea5607 100644
--- a/include/mesos/scheduler/scheduler.proto
+++ b/include/mesos/scheduler/scheduler.proto
@@ -34,8 +34,7 @@ message Event {
// Possible event types, followed by message definitions if
// applicable.
enum Type {
- REGISTERED = 1;
- REREGISTERED = 2;
+ SUBSCRIBED = 1;
OFFERS = 3;
RESCIND = 4;
UPDATE = 5;
@@ -44,11 +43,8 @@ message Event {
ERROR = 8;
}
- message Registered {
- required FrameworkID framework_id = 1;
- }
-
- message Reregistered {
+ // First event received when the scheduler subscribes.
+ message Subscribed {
required FrameworkID framework_id = 1;
}
@@ -93,8 +89,7 @@ message Event {
// present if that type has a nested message definition.
required Type type = 1;
- optional Registered registered = 2;
- optional Reregistered reregistered = 3;
+ optional Subscribed subscribed = 2;
optional Offers offers = 4;
optional Rescind rescind = 5;
optional Update update = 6;
@@ -114,8 +109,7 @@ message Call {
// Possible call types, followed by message definitions if
// applicable.
enum Type {
- REGISTER = 1;
- REREGISTER = 2;
+ SUBSCRIBE = 1; // See 'framework_info' below.
UNREGISTER = 3;
REVIVE = 6;
DECLINE = 5;
@@ -127,7 +121,7 @@ message Call {
SHUTDOWN = 13;
// TODO(benh): Consider adding an 'ACTIVATE' and 'DEACTIVATE' for
- // already registered frameworks as a way of stopping offers from
+ // already subscribed frameworks as a way of stopping offers from
// being generated and other events from being sent by the master.
// Note that this functionality existed originally to support
// SchedulerDriver::abort which was only necessary to handle
@@ -214,7 +208,9 @@ message Call {
// Identifies who generated this call. Always necessary, but the
// only thing that needs to be set for certain calls, e.g.,
- // REGISTER, REREGISTER, and UNREGISTER.
+ // SUBSCRIBE and UNREGISTER. 'framework_info.id()' must be always
+ // set except when a brand new scheduler SUBSCRIBEs for the very
+ // first time.
required FrameworkInfo framework_info = 1;
// Type of the call, indicates which optional field below should be
http://git-wip-us.apache.org/repos/asf/mesos/blob/eb3c958e/src/examples/low_level_scheduler_libprocess.cpp
----------------------------------------------------------------------
diff --git a/src/examples/low_level_scheduler_libprocess.cpp b/src/examples/low_level_scheduler_libprocess.cpp
index fbfb0f7..cf85c76 100644
--- a/src/examples/low_level_scheduler_libprocess.cpp
+++ b/src/examples/low_level_scheduler_libprocess.cpp
@@ -117,24 +117,13 @@ public:
events.pop();
switch (event.type()) {
- case Event::REGISTERED: {
- cout << endl << "Received a REGISTERED event" << endl;
+ case Event::SUBSCRIBED: {
+ cout << endl << "Received a SUBSCRIBED event" << endl;
- framework.mutable_id()->CopyFrom(event.registered().framework_id());
- state = REGISTERED;
+ framework.mutable_id()->CopyFrom(event.subscribed().framework_id());
+ state = SUBSCRIBED;
- cout << "Framework '" << event.registered().framework_id().value()
- << "' registered" << endl;
- break;
- }
-
- case Event::REREGISTERED: {
- cout << endl << "Received a REREGISTERED event" << endl;
-
- state = REGISTERED;
-
- cout << "Framework '" << event.reregistered().framework_id().value()
- << "' re-registered" << endl;
+ cout << "Subscribed with ID '" << framework.id() << endl;
break;
}
@@ -303,14 +292,13 @@ private:
void doReliableRegistration()
{
- if (state == REGISTERED) {
+ if (state == SUBSCRIBED) {
return;
}
Call call;
call.mutable_framework_info()->CopyFrom(framework);
- call.set_type(
- state == INITIALIZING ? Call::REGISTER : Call::REREGISTER);
+ call.set_type(Call::SUBSCRIBE);
mesos.send(call);
@@ -334,7 +322,7 @@ private:
enum State {
INITIALIZING = 0,
- REGISTERED = 1,
+ SUBSCRIBED = 1,
DISCONNECTED = 2
} state;
http://git-wip-us.apache.org/repos/asf/mesos/blob/eb3c958e/src/examples/low_level_scheduler_pthread.cpp
----------------------------------------------------------------------
diff --git a/src/examples/low_level_scheduler_pthread.cpp b/src/examples/low_level_scheduler_pthread.cpp
index 944573d..4af576d 100644
--- a/src/examples/low_level_scheduler_pthread.cpp
+++ b/src/examples/low_level_scheduler_pthread.cpp
@@ -135,29 +135,16 @@ public:
events.pop();
switch (event.type()) {
- case Event::REGISTERED: {
- cout << endl << "Received a REGISTERED event" << endl;
+ case Event::SUBSCRIBED: {
+ cout << endl << "Received a SUBSCRIBED event" << endl;
pthread_mutex_lock(&mutex);
- state = REGISTERED;
+ state = SUBSCRIBED;
pthread_mutex_unlock(&mutex);
- framework.mutable_id()->CopyFrom(event.registered().framework_id());
+ framework.mutable_id()->CopyFrom(event.subscribed().framework_id());
- cout << "Framework '" << event.registered().framework_id().value()
- << "' registered" << endl;
- break;
- }
-
- case Event::REREGISTERED: {
- cout << endl << "Received a REREGISTERED event" << endl;
-
- pthread_mutex_lock(&mutex);
- state = REGISTERED;
- pthread_mutex_unlock(&mutex);
-
- cout << "Framework '" << event.reregistered().framework_id().value()
- << "' re-registered" << endl;
+ cout << "Subscribed with ID '" << framework.id() << endl;
break;
}
@@ -362,8 +349,7 @@ private:
Call call;
call.mutable_framework_info()->CopyFrom(framework);
- call.set_type(
- state == CONNECTED ? Call::REGISTER : Call::REREGISTER);
+ call.set_type(Call::SUBSCRIBE);
pthread_mutex_unlock(&mutex);
mesos.send(call);
@@ -390,7 +376,7 @@ private:
enum State {
INITIALIZING = 0,
CONNECTED = 1,
- REGISTERED = 2,
+ SUBSCRIBED = 2,
DISCONNECTED = 3,
DONE = 4
} state;
http://git-wip-us.apache.org/repos/asf/mesos/blob/eb3c958e/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 2c161a9..502d3ba 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1555,15 +1555,13 @@ void Master::receive(
const scheduler::Call& call)
{
// TODO(vinod): Add metrics for calls.
-
+ // TODO(vinod): Implement the unimplemented calls.
const FrameworkInfo& frameworkInfo = call.framework_info();
- // For REGISTER and REREGISTER calls, no need to look up the
- // framework. Therefore, we handle them first and separately from
- // other types of calls.
+ // For SUBSCRIBE call, no need to look up the framework. Therefore,
+ // we handle them first and separately from other types of calls.
switch (call.type()) {
- case scheduler::Call::REGISTER:
- case scheduler::Call::REREGISTER:
+ case scheduler::Call::SUBSCRIBE:
drop(from, call, "Unimplemented");
return;
@@ -1586,7 +1584,8 @@ void Master::receive(
}
// TODO(jieyu): Validate frameworkInfo to make sure it's the same as
- // the one that the framework used during registration.
+ // the one that the framework used during registration and that the
+ // framework id is set and non-empty except for SUBSCRIBE call.
switch (call.type()) {
case scheduler::Call::UNREGISTER:
http://git-wip-us.apache.org/repos/asf/mesos/blob/eb3c958e/src/scheduler/scheduler.cpp
----------------------------------------------------------------------
diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp
index 7d53d51..82cbfcb 100644
--- a/src/scheduler/scheduler.cpp
+++ b/src/scheduler/scheduler.cpp
@@ -200,9 +200,10 @@ public:
call.mutable_framework_info()->set_user(user.get());
}
- // Only a REGISTER should not have set the framework ID.
- if (call.type() != Call::REGISTER && !call.framework_info().has_id()) {
- drop(call, "Call is mising FrameworkInfo.id");
+ // Only a SUBSCRIBE call may not have set the framework ID.
+ if (call.type() != Call::SUBSCRIBE &&
+ (!call.framework_info().has_id() || call.framework_info().id() == "")) {
+ drop(call, "Call is missing FrameworkInfo.id");
return;
}
@@ -213,18 +214,18 @@ public:
}
switch (call.type()) {
- case Call::REGISTER: {
- RegisterFrameworkMessage message;
- message.mutable_framework()->CopyFrom(call.framework_info());
- send(master.get(), message);
- break;
- }
-
- case Call::REREGISTER: {
- ReregisterFrameworkMessage message;
- message.mutable_framework()->CopyFrom(call.framework_info());
- message.set_failover(failover);
- send(master.get(), message);
+ case Call::SUBSCRIBE: {
+ if (!call.framework_info().has_id() ||
+ call.framework_info().id() == "") {
+ RegisterFrameworkMessage message;
+ message.mutable_framework()->CopyFrom(call.framework_info());
+ send(master.get(), message);
+ } else {
+ ReregisterFrameworkMessage message;
+ message.mutable_framework()->CopyFrom(call.framework_info());
+ message.set_failover(failover);
+ send(master.get(), message);
+ }
break;
}
@@ -386,6 +387,7 @@ protected:
VLOG(1) << "New master detected at " << master.get();
if (credential.isSome()) {
+ // TODO(vinod): Do pure HTTP Authentication instead of SASL.
// Authenticate with the master.
authenticate();
} else {
@@ -572,34 +574,27 @@ protected:
void receive(const UPID& from, const FrameworkRegisteredMessage& message)
{
- // We've now registered at least once with the master so we're no
- // longer failing over. See the comment where 'failover' is
- // declared for further details.
- failover = false;
-
- Event event;
- event.set_type(Event::REGISTERED);
-
- Event::Registered* registered = event.mutable_registered();
-
- registered->mutable_framework_id()->CopyFrom(message.framework_id());
-
- receive(from, event);
+ subscribed(from, message.framework_id());
}
void receive(const UPID& from, const FrameworkReregisteredMessage& message)
{
+ subscribed(from, message.framework_id());
+ }
+
+ void subscribed(const UPID& from, const FrameworkID& frameworkId)
+ {
// We've now registered at least once with the master so we're no
// longer failing over. See the comment where 'failover' is
// declared for further details.
failover = false;
Event event;
- event.set_type(Event::REREGISTERED);
+ event.set_type(Event::SUBSCRIBED);
- Event::Reregistered* reregistered = event.mutable_reregistered();
+ Event::Subscribed* subscribed = event.mutable_subscribed();
- reregistered->mutable_framework_id()->CopyFrom(message.framework_id());
+ subscribed->mutable_framework_id()->CopyFrom(frameworkId);
receive(from, event);
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/eb3c958e/src/tests/scheduler_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/scheduler_tests.cpp b/src/tests/scheduler_tests.cpp
index a59b146..ddbb712 100644
--- a/src/tests/scheduler_tests.cpp
+++ b/src/tests/scheduler_tests.cpp
@@ -142,16 +142,16 @@ TEST_F(SchedulerTest, TaskRunning)
{
Call call;
call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
- call.set_type(Call::REGISTER);
+ call.set_type(Call::SUBSCRIBE);
mesos.send(call);
}
Future<Event> event = events.get();
AWAIT_READY(event);
- EXPECT_EQ(Event::REGISTERED, event.get().type());
+ EXPECT_EQ(Event::SUBSCRIBED, event.get().type());
- FrameworkID id(event.get().registered().framework_id());
+ FrameworkID id(event.get().subscribed().framework_id());
event = events.get();
AWAIT_READY(event);
@@ -247,16 +247,16 @@ TEST_F(SchedulerTest, ReconcileTask)
{
Call call;
call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
- call.set_type(Call::REGISTER);
+ call.set_type(Call::SUBSCRIBE);
mesos.send(call);
}
Future<Event> event = events.get();
AWAIT_READY(event);
- EXPECT_EQ(Event::REGISTERED, event.get().type());
+ EXPECT_EQ(Event::SUBSCRIBED, event.get().type());
- FrameworkID id(event.get().registered().framework_id());
+ FrameworkID id(event.get().subscribed().framework_id());
event = events.get();
AWAIT_READY(event);
@@ -354,16 +354,16 @@ TEST_F(SchedulerTest, KillTask)
{
Call call;
call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
- call.set_type(Call::REGISTER);
+ call.set_type(Call::SUBSCRIBE);
mesos.send(call);
}
Future<Event> event = events.get();
AWAIT_READY(event);
- EXPECT_EQ(Event::REGISTERED, event.get().type());
+ EXPECT_EQ(Event::SUBSCRIBED, event.get().type());
- FrameworkID id(event.get().registered().framework_id());
+ FrameworkID id(event.get().subscribed().framework_id());
event = events.get();
AWAIT_READY(event);
@@ -478,16 +478,16 @@ TEST_F(SchedulerTest, ShutdownExecutor)
{
Call call;
call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
- call.set_type(Call::REGISTER);
+ call.set_type(Call::SUBSCRIBE);
mesos.send(call);
}
Future<Event> event = events.get();
AWAIT_READY(event);
- EXPECT_EQ(Event::REGISTERED, event.get().type());
+ EXPECT_EQ(Event::SUBSCRIBED, event.get().type());
- FrameworkID id(event.get().registered().framework_id());
+ FrameworkID id(event.get().subscribed().framework_id());
event = events.get();
AWAIT_READY(event);