You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 11:06:19 UTC
svn commit: r1132242 - in /incubator/mesos/trunk: include/ src/common/
src/examples/ src/examples/java/ src/exec/ src/launcher/ src/master/
src/messaging/ src/sched/ src/slave/ src/tests/
Author: benh
Date: Sun Jun 5 09:06:18 2011
New Revision: 1132242
URL: http://svn.apache.org/viewvc?rev=1132242&view=rev
Log:
Initial support for multiple executors per framework per slave.
Modified:
incubator/mesos/trunk/include/mesos.proto
incubator/mesos/trunk/src/common/foreach.hpp
incubator/mesos/trunk/src/common/type_utils.hpp
incubator/mesos/trunk/src/examples/cpp_test_framework.cpp
incubator/mesos/trunk/src/examples/java/TestExceptionFramework.java
incubator/mesos/trunk/src/examples/java/TestFramework.java
incubator/mesos/trunk/src/examples/memhog.cpp
incubator/mesos/trunk/src/examples/scheduled_memhog.cpp
incubator/mesos/trunk/src/exec/exec.cpp
incubator/mesos/trunk/src/launcher/launcher.cpp
incubator/mesos/trunk/src/launcher/launcher.hpp
incubator/mesos/trunk/src/launcher/main.cpp
incubator/mesos/trunk/src/master/master.cpp
incubator/mesos/trunk/src/messaging/messages.proto
incubator/mesos/trunk/src/sched/sched.cpp
incubator/mesos/trunk/src/slave/isolation_module.hpp
incubator/mesos/trunk/src/slave/lxc_isolation_module.hpp
incubator/mesos/trunk/src/slave/main.cpp
incubator/mesos/trunk/src/slave/process_based_isolation_module.cpp
incubator/mesos/trunk/src/slave/process_based_isolation_module.hpp
incubator/mesos/trunk/src/slave/slave.cpp
incubator/mesos/trunk/src/slave/slave.hpp
incubator/mesos/trunk/src/tests/master_test.cpp
incubator/mesos/trunk/src/tests/offer_reply_errors_test.cpp
incubator/mesos/trunk/src/tests/utils.hpp
Modified: incubator/mesos/trunk/include/mesos.proto
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/include/mesos.proto?rev=1132242&r1=1132241&r2=1132242&view=diff
==============================================================================
--- incubator/mesos/trunk/include/mesos.proto (original)
+++ incubator/mesos/trunk/include/mesos.proto Sun Jun 5 09:06:18 2011
@@ -23,6 +23,11 @@ message TaskID {
}
+message ExecutorID {
+ required string value = 1;
+}
+
+
message Param {
required string key = 1;
required string value = 2;
@@ -99,6 +104,7 @@ message TaskDescription {
required SlaveID slave_id = 3;
repeated Resource resources = 4;
optional bytes data = 5;
+ optional ExecutorInfo executor = 6;
}
@@ -121,9 +127,10 @@ message TaskStatus {
message ExecutorInfo {
- required string uri = 1;
- optional Params params = 2;
- optional bytes data = 3;
+ required ExecutorID executor_id = 1;
+ required string uri = 2;
+ optional Params params = 3;
+ optional bytes data = 4;
}
@@ -144,7 +151,8 @@ message SlaveInfo {
message FrameworkMessage {
required SlaveID slave_id = 1;
- optional bytes data = 2;
+ required ExecutorID executor_id = 2;
+ optional bytes data = 3;
}
Modified: incubator/mesos/trunk/src/common/foreach.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/common/foreach.hpp?rev=1132242&r1=1132241&r2=1132242&view=diff
==============================================================================
--- incubator/mesos/trunk/src/common/foreach.hpp (original)
+++ incubator/mesos/trunk/src/common/foreach.hpp Sun Jun 5 09:06:18 2011
@@ -29,6 +29,17 @@ namespace foreach {
const boost::tuples::detail::swallow_assign _ = boost::tuples::ignore;
+template <typename T> T copy(const T& t) { return t; }
+
}
+
+#define foreachcopy(VAR, COL) \
+ foreach (VAR, foreach::copy(COL))
+
+#define foreachpaircopy(VARFIRST, VARSECOND, COL) \
+ foreachpair (VARFIRST, VARSECOND, foreach::copy(COL))
+
+
+
#endif /* FOREACH_HPP */
Modified: incubator/mesos/trunk/src/common/type_utils.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/common/type_utils.hpp?rev=1132242&r1=1132241&r2=1132242&view=diff
==============================================================================
--- incubator/mesos/trunk/src/common/type_utils.hpp (original)
+++ incubator/mesos/trunk/src/common/type_utils.hpp Sun Jun 5 09:06:18 2011
@@ -47,6 +47,13 @@ inline std::ostream& operator << (std::o
}
+inline std::ostream& operator << (std::ostream& stream,
+ const ExecutorID& executorId)
+{
+ stream << executorId.value();
+ return stream;
+}
+
inline bool operator == (const FrameworkID& left, const FrameworkID& right)
{
return left.value() == right.value();
@@ -71,6 +78,12 @@ inline bool operator == (const TaskID& l
}
+inline bool operator == (const ExecutorID& left, const ExecutorID& right)
+{
+ return left.value() == right.value();
+}
+
+
inline bool operator == (const FrameworkID& left, const std::string& right)
{
return left.value() == right;
@@ -95,6 +108,12 @@ inline bool operator == (const TaskID& l
}
+inline bool operator == (const ExecutorID& left, const std::string& right)
+{
+ return left.value() == right;
+}
+
+
inline size_t hash_value(const FrameworkID& frameworkId)
{
size_t seed = 0;
@@ -127,6 +146,14 @@ inline size_t hash_value(const TaskID& t
}
+inline size_t hash_value(const ExecutorID& executorId)
+{
+ size_t seed = 0;
+ boost::hash_combine(seed, executorId.value());
+ return seed;
+}
+
+
namespace internal {
inline std::ostream& operator << (std::ostream& stream,
Modified: incubator/mesos/trunk/src/examples/cpp_test_framework.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/examples/cpp_test_framework.cpp?rev=1132242&r1=1132241&r2=1132242&view=diff
==============================================================================
--- incubator/mesos/trunk/src/examples/cpp_test_framework.cpp (original)
+++ incubator/mesos/trunk/src/examples/cpp_test_framework.cpp Sun Jun 5 09:06:18 2011
@@ -36,6 +36,7 @@ public:
virtual ExecutorInfo getExecutorInfo(SchedulerDriver*)
{
ExecutorInfo executor;
+ executor.mutable_executor_id()->set_value("default");
executor.set_uri(uri);
return executor;
}
Modified: incubator/mesos/trunk/src/examples/java/TestExceptionFramework.java
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/examples/java/TestExceptionFramework.java?rev=1132242&r1=1132241&r2=1132242&view=diff
==============================================================================
--- incubator/mesos/trunk/src/examples/java/TestExceptionFramework.java (original)
+++ incubator/mesos/trunk/src/examples/java/TestExceptionFramework.java Sun Jun 5 09:06:18 2011
@@ -18,7 +18,9 @@ public class TestExceptionFramework {
try {
File file = new File("./test_executor");
return ExecutorInfo.newBuilder()
- .setUri(file.getCanonicalPath()).build();
+ .setExecutorId(ExecutorID.newBuilder().setValue("default").build())
+ .setUri(file.getCanonicalPath())
+ .build();
} catch (Throwable t) {
throw new RuntimeException(t);
}
Modified: incubator/mesos/trunk/src/examples/java/TestFramework.java
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/examples/java/TestFramework.java?rev=1132242&r1=1132241&r2=1132242&view=diff
==============================================================================
--- incubator/mesos/trunk/src/examples/java/TestFramework.java (original)
+++ incubator/mesos/trunk/src/examples/java/TestFramework.java Sun Jun 5 09:06:18 2011
@@ -32,7 +32,9 @@ public class TestFramework {
try {
File file = new File("./test_executor");
return ExecutorInfo.newBuilder()
- .setUri(file.getCanonicalPath()).build();
+ .setExecutorId(ExecutorID.newBuilder().setValue("default").build())
+ .setUri(file.getCanonicalPath())
+ .build();
} catch (Throwable t) {
throw new RuntimeException(t);
}
Modified: incubator/mesos/trunk/src/examples/memhog.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/examples/memhog.cpp?rev=1132242&r1=1132241&r2=1132242&view=diff
==============================================================================
--- incubator/mesos/trunk/src/examples/memhog.cpp (original)
+++ incubator/mesos/trunk/src/examples/memhog.cpp Sun Jun 5 09:06:18 2011
@@ -36,6 +36,7 @@ public:
virtual ExecutorInfo getExecutorInfo(SchedulerDriver*)
{
ExecutorInfo executor;
+ executor.mutable_executor_id()->set_value("default");
executor.set_uri(uri);
return executor;
}
Modified: incubator/mesos/trunk/src/examples/scheduled_memhog.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/examples/scheduled_memhog.cpp?rev=1132242&r1=1132241&r2=1132242&view=diff
==============================================================================
--- incubator/mesos/trunk/src/examples/scheduled_memhog.cpp (original)
+++ incubator/mesos/trunk/src/examples/scheduled_memhog.cpp Sun Jun 5 09:06:18 2011
@@ -82,6 +82,7 @@ public:
virtual ExecutorInfo getExecutorInfo(SchedulerDriver*)
{
ExecutorInfo executor;
+ executor.mutable_executor_id()->set_value("default");
executor.set_uri(uri);
return executor;
}
Modified: incubator/mesos/trunk/src/exec/exec.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/exec/exec.cpp?rev=1132242&r1=1132241&r2=1132242&view=diff
==============================================================================
--- incubator/mesos/trunk/src/exec/exec.cpp (original)
+++ incubator/mesos/trunk/src/exec/exec.cpp Sun Jun 5 09:06:18 2011
@@ -18,16 +18,16 @@
#include "messaging/messages.hpp"
-using std::cerr;
-using std::endl;
-using std::string;
+using namespace mesos;
+using namespace mesos::internal;
using boost::bind;
using boost::cref;
using boost::unordered_map;
-using namespace mesos;
-using namespace mesos::internal;
+using std::cerr;
+using std::endl;
+using std::string;
namespace mesos { namespace internal {
@@ -37,9 +37,10 @@ class ExecutorProcess : public MesosProc
public:
ExecutorProcess(const PID& _slave, MesosExecutorDriver* _driver,
Executor* _executor, const FrameworkID& _frameworkId,
- bool _local)
+ const ExecutorID& _executorId, bool _local)
: slave(_slave), driver(_driver), executor(_executor),
- frameworkId(_frameworkId), local(_local), terminate(false) {}
+ frameworkId(_frameworkId), executorId(_executorId),
+ local(_local), terminate(false) {}
~ExecutorProcess() {}
@@ -51,6 +52,7 @@ protected:
// Register with slave.
Message<E2S_REGISTER_EXECUTOR> out;
out.mutable_framework_id()->MergeFrom(frameworkId);
+ out.mutable_executor_id()->MergeFrom(executorId);
send(slave, out);
while(true) {
@@ -149,6 +151,7 @@ private:
MesosExecutorDriver* driver;
Executor* executor;
FrameworkID frameworkId;
+ ExecutorID executorId;
SlaveID slaveId;
bool local;
@@ -203,6 +206,7 @@ int MesosExecutorDriver::start()
PID slave;
FrameworkID frameworkId;
+ ExecutorID executorId;
char* value;
std::istringstream iss;
@@ -234,7 +238,16 @@ int MesosExecutorDriver::start()
frameworkId.set_value(value);
- process = new ExecutorProcess(slave, this, executor, frameworkId, local);
+ /* Get executor ID from environment. */
+ value = getenv("MESOS_EXECUTOR_ID");
+
+ if (value == NULL)
+ fatal("expecting MESOS_EXECUTOR_ID in environment");
+
+ executorId.set_value(value);
+
+ process =
+ new ExecutorProcess(slave, this, executor, frameworkId, executorId, local);
Process::spawn(process);
@@ -312,11 +325,15 @@ int MesosExecutorDriver::sendFrameworkMe
return -1;
}
- // Validate that they set the correct slave ID.
+ // Validate that they set the correct slave ID and executor ID.
if (!(process->slaveId == message.slave_id())) {
return -1;
}
+ if (!(process->executorId == message.executor_id())) {
+ return -1;
+ }
+
// TODO(benh): Do a dispatch to Executor first?
Message<E2S_FRAMEWORK_MESSAGE> out;
out.mutable_framework_id()->MergeFrom(process->frameworkId);
Modified: incubator/mesos/trunk/src/launcher/launcher.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/launcher/launcher.cpp?rev=1132242&r1=1132241&r2=1132242&view=diff
==============================================================================
--- incubator/mesos/trunk/src/launcher/launcher.cpp (original)
+++ incubator/mesos/trunk/src/launcher/launcher.cpp Sun Jun 5 09:06:18 2011
@@ -31,7 +31,8 @@ using std::string;
using std::vector;
-ExecutorLauncher::ExecutorLauncher(FrameworkID _frameworkId,
+ExecutorLauncher::ExecutorLauncher(const FrameworkID& _frameworkId,
+ const ExecutorID& _executorId,
const string& _executorUri,
const string& _user,
const string& _workDirectory,
@@ -42,7 +43,8 @@ ExecutorLauncher::ExecutorLauncher(Frame
bool _redirectIO,
bool _shouldSwitchUser,
const map<string, string>& _params)
- : frameworkId(_frameworkId), executorUri(_executorUri), user(_user),
+ : frameworkId(_frameworkId), executorId(_executorId),
+ executorUri(_executorUri), user(_user),
workDirectory(_workDirectory), slavePid(_slavePid),
frameworksHome(_frameworksHome), mesosHome(_mesosHome),
hadoopHome(_hadoopHome), redirectIO(_redirectIO),
@@ -228,6 +230,7 @@ void ExecutorLauncher::setupEnvironment(
// Set Mesos environment variables to pass slave ID, framework ID, etc.
setenv("MESOS_SLAVE_PID", slavePid.c_str(), true);
setenv("MESOS_FRAMEWORK_ID", frameworkId.value().c_str(), true);
+ setenv("MESOS_EXECUTOR_ID", executorId.value().c_str(), true);
// Set LIBPROCESS_PORT so that we bind to a random free port.
setenv("LIBPROCESS_PORT", "0", true);
Modified: incubator/mesos/trunk/src/launcher/launcher.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/launcher/launcher.hpp?rev=1132242&r1=1132241&r2=1132242&view=diff
==============================================================================
--- incubator/mesos/trunk/src/launcher/launcher.hpp (original)
+++ incubator/mesos/trunk/src/launcher/launcher.hpp Sun Jun 5 09:06:18 2011
@@ -31,6 +31,7 @@ using std::vector;
class ExecutorLauncher {
protected:
FrameworkID frameworkId;
+ ExecutorID executorId;
string executorUri;
string user;
string workDirectory; // Directory in which the framework should run
@@ -43,7 +44,8 @@ protected:
map<string, string> params; // Key-value params in framework's ExecutorInfo
public:
- ExecutorLauncher(FrameworkID _frameworkId, const string& _executorUri,
+ ExecutorLauncher(const FrameworkID& _frameworkId,
+ const ExecutorID& _executorId, const string& _executorUri,
const string& _user, const string& _workDirectory,
const string& _slavePid, const string& _frameworksHome,
const string& _mesosHome, const string& _hadoopHome,
Modified: incubator/mesos/trunk/src/launcher/main.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/launcher/main.cpp?rev=1132242&r1=1132241&r2=1132242&view=diff
==============================================================================
--- incubator/mesos/trunk/src/launcher/main.cpp (original)
+++ incubator/mesos/trunk/src/launcher/main.cpp Sun Jun 5 09:06:18 2011
@@ -26,7 +26,11 @@ int main(int argc, char **argv)
FrameworkID frameworkId;
frameworkId.set_value(getenvOrFail("MESOS_FRAMEWORK_ID"));
+ ExecutorID executorId;
+ executorId.set_value(getenvOrFail("MESOS_EXECUTOR_ID"));
+
ExecutorLauncher(frameworkId,
+ executorId,
getenvOrFail("MESOS_EXECUTOR_URI"),
getenvOrFail("MESOS_USER"),
getenvOrFail("MESOS_WORK_DIRECTORY"),
Modified: incubator/mesos/trunk/src/master/master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/master.cpp?rev=1132242&r1=1132241&r2=1132242&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/master.cpp (original)
+++ incubator/mesos/trunk/src/master/master.cpp Sun Jun 5 09:06:18 2011
@@ -805,8 +805,7 @@ void Master::operator () ()
}
case M2M_TIMER_TICK: {
- unordered_map<SlaveID, Slave *> slavesCopy = slaves;
- foreachpair (_, Slave *slave, slavesCopy) {
+ foreachpaircopy (_, Slave *slave, slaves) {
if (slave->lastHeartbeat + HEARTBEAT_TIMEOUT <= elapsed()) {
LOG(INFO) << slave
<< " missing heartbeats ... considering disconnected";
@@ -848,8 +847,7 @@ void Master::operator () ()
framework->active = false;
// Remove the framework's slot offers.
- unordered_set<SlotOffer *> slotOffersCopy = framework->slotOffers;
- foreach (SlotOffer* offer, slotOffersCopy) {
+ foreachcopy (SlotOffer* offer, framework->slotOffers) {
removeSlotOffer(offer, ORR_FRAMEWORK_FAILOVER, offer->resources);
}
@@ -1182,8 +1180,7 @@ void Master::failoverFramework(Framework
// Remove the framework's slot offers (if they weren't removed before)..
// TODO(benh): Consider just reoffering these to the new framework.
- unordered_set<SlotOffer *> slotOffersCopy = framework->slotOffers;
- foreach (SlotOffer* offer, slotOffersCopy) {
+ foreachcopy (SlotOffer* offer, framework->slotOffers) {
removeSlotOffer(offer, ORR_FRAMEWORK_FAILOVER, offer->resources);
}
@@ -1231,16 +1228,14 @@ void Master::removeFramework(Framework *
}
// Remove pointers to the framework's tasks in slaves
- unordered_map<TaskID, Task *> tasksCopy = framework->tasks;
- foreachpair (_, Task *task, tasksCopy) {
+ foreachpaircopy (_, Task *task, framework->tasks) {
Slave *slave = lookupSlave(task->slave_id());
CHECK(slave != NULL);
removeTask(task, TRR_FRAMEWORK_LOST);
}
// Remove the framework's slot offers (if they weren't removed before).
- unordered_set<SlotOffer *> slotOffersCopy = framework->slotOffers;
- foreach (SlotOffer* offer, slotOffersCopy) {
+ foreachcopy (SlotOffer* offer, framework->slotOffers) {
removeSlotOffer(offer, ORR_FRAMEWORK_LOST, offer->resources);
}
@@ -1264,8 +1259,7 @@ void Master::removeSlave(Slave *slave)
// TODO: Notify allocator that a slave removal is beginning?
// Remove pointers to slave's tasks in frameworks, and send status updates
- unordered_map<pair<FrameworkID, TaskID>, Task *> tasksCopy = slave->tasks;
- foreachpair (_, Task *task, tasksCopy) {
+ foreachpaircopy (_, Task *task, slave->tasks) {
Framework *framework = lookupFramework(task->framework_id());
// A framework might not actually exist because the master failed
// over and the framework hasn't reconnected. This can be a tricky
@@ -1288,8 +1282,7 @@ void Master::removeSlave(Slave *slave)
}
// Remove slot offers from the slave; this will also rescind them
- unordered_set<SlotOffer *> slotOffersCopy = slave->slotOffers;
- foreach (SlotOffer *offer, slotOffersCopy) {
+ foreachcopy (SlotOffer *offer, slave->slotOffers) {
// Only report resources on slaves other than this one to the allocator
vector<SlaveResources> otherSlaveResources;
foreach (const SlaveResources& r, offer->resources) {
Modified: incubator/mesos/trunk/src/messaging/messages.proto
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messaging/messages.proto?rev=1132242&r1=1132241&r2=1132242&view=diff
==============================================================================
--- incubator/mesos/trunk/src/messaging/messages.proto (original)
+++ incubator/mesos/trunk/src/messaging/messages.proto Sun Jun 5 09:06:18 2011
@@ -3,16 +3,18 @@ import "mesos.proto";
package mesos.internal;
// TODO(benh): It would be great if this could just be a
-// TaskDescription wherever it gets used! One performance reason why
-// we don't do that now is because storing whatever data is coupled
-// with a TaskDescription could be large and unnecessary.
+// TaskDescription wherever it gets used! We would need to add both
+// the framework_id field and the state field into TaskDescription
+// though. Also, one performance reason why we don't do that now is
+// because storing whatever data is coupled with a TaskDescription
+// could be large and unnecessary.
message Task {
- required string name = 1;
- required TaskID task_id = 2;
- required FrameworkID framework_id = 3;
- required SlaveID slave_id = 4;
- repeated Resource resources = 5;
- required TaskState state = 6;
+ required FrameworkID framework_id = 1;
+ required TaskState state = 2;
+ required string name = 3;
+ required TaskID task_id = 4;
+ required SlaveID slave_id = 5;
+ repeated Resource resources = 6;
}
@@ -147,6 +149,7 @@ message UpdateFrameworkMessage {
message RegisterExecutorMessage {
required FrameworkID framework_id = 1;
+ required ExecutorID executor_id = 2;
}
@@ -158,7 +161,8 @@ message ExecutorRegisteredMessage {
message ExitedExecutorMessage {
required SlaveID slave_id = 1;
required FrameworkID framework_id = 2;
- required int32 status = 3;
+ required ExecutorID executor_id = 3;
+ required int32 status = 4;
}
Modified: incubator/mesos/trunk/src/sched/sched.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/sched/sched.cpp?rev=1132242&r1=1132241&r2=1132242&view=diff
==============================================================================
--- incubator/mesos/trunk/src/sched/sched.cpp (original)
+++ incubator/mesos/trunk/src/sched/sched.cpp Sun Jun 5 09:06:18 2011
@@ -726,6 +726,11 @@ int MesosSchedulerDriver::sendFrameworkM
return -1;
}
+ if (!message.has_executor_id()) {
+ VLOG(1) << "Missing ExecutorID (executor_id), cannot send message";
+ return -1;
+ }
+
Process::dispatch(process, &SchedulerProcess::sendFrameworkMessage, message);
return 0;
Modified: incubator/mesos/trunk/src/slave/isolation_module.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/isolation_module.hpp?rev=1132242&r1=1132241&r2=1132242&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/isolation_module.hpp (original)
+++ incubator/mesos/trunk/src/slave/isolation_module.hpp Sun Jun 5 09:06:18 2011
@@ -6,8 +6,9 @@
namespace mesos { namespace internal { namespace slave {
-class Framework;
class Slave;
+class Framework;
+class Executor;
class IsolationModule {
@@ -21,15 +22,15 @@ public:
virtual void initialize(Slave *slave) {}
// Called by the slave to launch an executor for a given framework.
- virtual void startExecutor(Framework *framework) = 0;
+ virtual void launchExecutor(Framework* framework, Executor* executor) = 0;
// Terminate a framework's executor, if it is still running.
// The executor is expected to be gone after this method exits.
- virtual void killExecutor(Framework *framework) = 0;
+ virtual void killExecutor(Framework* framework, Executor* executor) = 0;
// Update the resource limits for a given framework. This method will
// be called only after an executor for the framework is started.
- virtual void resourcesChanged(Framework *framework) {}
+ virtual void resourcesChanged(Framework *framework, Executor* executor) {}
};
}}}
Modified: incubator/mesos/trunk/src/slave/lxc_isolation_module.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/lxc_isolation_module.hpp?rev=1132242&r1=1132241&r2=1132242&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/lxc_isolation_module.hpp (original)
+++ incubator/mesos/trunk/src/slave/lxc_isolation_module.hpp Sun Jun 5 09:06:18 2011
@@ -51,11 +51,11 @@ public:
virtual void initialize(Slave* slave);
- virtual void startExecutor(Framework* framework);
+ virtual void launchExecutor(Framework* framework, Executor* executor);
- virtual void killExecutor(Framework* framework);
+ virtual void killExecutor(Framework* framework, Executor* executor);
- virtual void resourcesChanged(Framework* framework);
+ virtual void resourcesChanged(Framework* framework, Executor* executor);
protected:
// Run a shell command formatted with varargs and return its exit code.
Modified: incubator/mesos/trunk/src/slave/main.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/main.cpp?rev=1132242&r1=1132241&r2=1132242&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/main.cpp (original)
+++ incubator/mesos/trunk/src/slave/main.cpp Sun Jun 5 09:06:18 2011
@@ -6,12 +6,15 @@
#include "slave.hpp"
#include "webui.hpp"
-using boost::lexical_cast;
-using boost::bad_lexical_cast;
+using namespace mesos::internal;
+using namespace mesos::internal::slave;
-using namespace std;
+using boost::bad_lexical_cast;
+using boost::lexical_cast;
-using namespace mesos::internal::slave;
+using std::cerr;
+using std::endl;
+using std::string;
void usage(const char *programName, const Configurator& configurator)
Modified: incubator/mesos/trunk/src/slave/process_based_isolation_module.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/process_based_isolation_module.cpp?rev=1132242&r1=1132241&r2=1132242&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/process_based_isolation_module.cpp (original)
+++ incubator/mesos/trunk/src/slave/process_based_isolation_module.cpp Sun Jun 5 09:06:18 2011
@@ -13,6 +13,8 @@ using boost::lexical_cast;
using boost::unordered_map;
using boost::unordered_set;
+using launcher::ExecutorLauncher;
+
using std::cerr;
using std::cout;
using std::endl;
@@ -53,13 +55,13 @@ void ProcessBasedIsolationModule::initia
}
-void ProcessBasedIsolationModule::startExecutor(Framework* framework)
+void ProcessBasedIsolationModule::launchExecutor(Framework* framework, Executor* executor)
{
if (!initialized)
LOG(FATAL) << "Cannot launch executors before initialization!";
LOG(INFO) << "Starting executor for framework " << framework->frameworkId
- << ": " << framework->info.executor().uri();
+ << ": " << executor->info.uri();
pid_t pid;
if ((pid = fork()) == -1)
@@ -68,30 +70,31 @@ void ProcessBasedIsolationModule::startE
if (pid) {
// In parent process, record the pgid for killpg later.
LOG(INFO) << "Started executor, OS pid = " << pid;
- pgids[framework->frameworkId] = pid;
- framework->executorStatus = "PID: " + lexical_cast<string>(pid);
+ pgids[framework->frameworkId][executor->info.executor_id()] = pid;
+ executor->executorStatus = "PID: " + lexical_cast<string>(pid);
} else {
// In child process, make cleanup easier.
// if (setpgid(0, 0) < 0)
// PLOG(FATAL) << "Failed to put executor in own process group";
if ((pid = setsid()) == -1)
PLOG(FATAL) << "Failed to put executor in own session";
-
- createExecutorLauncher(framework)->run();
+
+ createExecutorLauncher(framework, executor)->run();
}
}
-void ProcessBasedIsolationModule::killExecutor(Framework* framework)
+void ProcessBasedIsolationModule::killExecutor(Framework* framework, Executor* executor)
{
- if (pgids[framework->frameworkId] != -1) {
+ if (pgids[framework->frameworkId][executor->info.executor_id()] != -1) {
// TODO(benh): Consider sending a SIGTERM, then after so much time
// if it still hasn't exited do a SIGKILL (can use a libprocess
// process for this).
- LOG(INFO) << "Sending SIGKILL to gpid " << pgids[framework->frameworkId];
- killpg(pgids[framework->frameworkId], SIGKILL);
- pgids[framework->frameworkId] = -1;
- framework->executorStatus = "No executor running";
+ LOG(INFO) << "Sending SIGKILL to gpid "
+ << pgids[framework->frameworkId][executor->info.executor_id()];
+ killpg(pgids[framework->frameworkId][executor->info.executor_id()], SIGKILL);
+ pgids[framework->frameworkId][executor->info.executor_id()] = -1;
+ executor->executorStatus = "No executor running";
// TODO(benh): Kill all of the process's descendants? Perhaps
// create a new libprocess process that continually tries to kill
@@ -99,39 +102,39 @@ void ProcessBasedIsolationModule::killEx
// to kill the executor last ... maybe this is just too much of a
// burden?
- pgids.erase(framework->frameworkId);
+ pgids[framework->frameworkId].erase(executor->info.executor_id());
}
}
-void ProcessBasedIsolationModule::resourcesChanged(Framework* framework)
+void ProcessBasedIsolationModule::resourcesChanged(Framework* framework, Executor* executor)
{
// Do nothing; subclasses may override this.
}
-ExecutorLauncher* ProcessBasedIsolationModule::createExecutorLauncher(Framework* framework)
+ExecutorLauncher* ProcessBasedIsolationModule::createExecutorLauncher(Framework* framework, Executor* executor)
{
- const Configuration& conf = slave->getConfiguration();
-
+ // Create a map of parameters for the executor launcher.
map<string, string> params;
- for (int i = 0; i < framework->info.executor().params().param_size(); i++) {
- params[framework->info.executor().params().param(i).key()] =
- framework->info.executor().params().param(i).value();
+ for (int i = 0; i < executor->info.params().param_size(); i++) {
+ params[executor->info.params().param(i).key()] =
+ executor->info.params().param(i).value();
}
- return
+ return
new ExecutorLauncher(framework->frameworkId,
- framework->info.executor().uri(),
+ executor->info.executor_id(),
+ executor->info.uri(),
framework->info.user(),
slave->getUniqueWorkDirectory(framework->frameworkId),
slave->self(),
- conf.get("frameworks_home", ""),
- conf.get("home", ""),
- conf.get("hadoop_home", ""),
+ slave->getConfiguration().get("frameworks_home", ""),
+ slave->getConfiguration().get("home", ""),
+ slave->getConfiguration().get("hadoop_home", ""),
!slave->local,
- conf.get("switch_user", true),
+ slave->getConfiguration().get("switch_user", true),
params);
}
@@ -151,18 +154,20 @@ void ProcessBasedIsolationModule::Reaper
pid_t pid;
int status;
if ((pid = waitpid((pid_t) -1, &status, WNOHANG)) > 0) {
- foreachpair (const FrameworkID& frameworkId, pid_t pgid,
- module->pgids) {
- if (pgid == pid) {
- // Kill the process group to clean up the tasks.
- LOG(INFO) << "Sending SIGKILL to gpid " << pgid;
- killpg(pgid, SIGKILL);
- module->pgids[frameworkId] = -1;
- LOG(INFO) << "Telling slave of lost framework " << frameworkId;
- // TODO(benh): This is broken if/when libprocess is parallel!
- module->slave->executorExited(frameworkId, status);
- module->pgids.erase(frameworkId);
- break;
+ foreachpair (const FrameworkID& frameworkId, _, module->pgids) {
+ foreachpair (const ExecutorID& executorId, pid_t pgid, module->pgids[frameworkId]) {
+ if (pgid == pid) {
+ // Kill the process group to clean up the tasks.
+ LOG(INFO) << "Sending SIGKILL to gpid " << pgid;
+ killpg(pgid, SIGKILL);
+ module->pgids[frameworkId][executorId] = -1;
+ LOG(INFO) << "Telling slave of lost executor " << executorId
+ << " of framework " << frameworkId;
+ // TODO(benh): This is broken if/when libprocess is parallel!
+ module->slave->executorExited(frameworkId, executorId, status);
+ module->pgids[frameworkId].erase(executorId);
+ break;
+ }
}
}
}
Modified: incubator/mesos/trunk/src/slave/process_based_isolation_module.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/process_based_isolation_module.hpp?rev=1132242&r1=1132241&r2=1132242&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/process_based_isolation_module.hpp (original)
+++ incubator/mesos/trunk/src/slave/process_based_isolation_module.hpp Sun Jun 5 09:06:18 2011
@@ -15,11 +15,20 @@
namespace mesos { namespace internal { namespace slave {
-using boost::unordered_map;
-using mesos::internal::launcher::ExecutorLauncher;
-
class ProcessBasedIsolationModule : public IsolationModule {
public:
+ ProcessBasedIsolationModule();
+
+ virtual ~ProcessBasedIsolationModule();
+
+ virtual void initialize(Slave *slave);
+
+ virtual void launchExecutor(Framework* framework, Executor* executor);
+
+ virtual void killExecutor(Framework* framework, Executor* executor);
+
+ virtual void resourcesChanged(Framework* framework, Executor* executor);
+
// Reaps child processes and tells the slave if they exit
class Reaper : public Process {
ProcessBasedIsolationModule* module;
@@ -34,25 +43,6 @@ public:
// Extra shutdown message for reaper
enum { SHUTDOWN_REAPER = PROCESS_MSGID };
-private:
- bool initialized;
- Slave* slave;
- unordered_map<FrameworkID, pid_t> pgids;
- Reaper* reaper;
-
-public:
- ProcessBasedIsolationModule();
-
- virtual ~ProcessBasedIsolationModule();
-
- virtual void initialize(Slave *slave);
-
- virtual void startExecutor(Framework *framework);
-
- virtual void killExecutor(Framework* framework);
-
- virtual void resourcesChanged(Framework* framework);
-
protected:
// Main method executed after a fork() to create a Launcher for launching
// an executor's process. The Launcher will create the child's working
@@ -61,7 +51,13 @@ protected:
// Subclasses of ProcessBasedIsolationModule that wish to override the
// default launching behavior should override createLauncher() and return
// their own Launcher object (including possibly a subclass of Launcher).
- virtual ExecutorLauncher* createExecutorLauncher(Framework* framework);
+ virtual launcher::ExecutorLauncher* createExecutorLauncher(Framework* framework, Executor* executor);
+
+private:
+ bool initialized;
+ Slave* slave;
+ boost::unordered_map<FrameworkID, boost::unordered_map<ExecutorID, pid_t> > pgids;
+ Reaper* reaper;
};
}}}
Modified: incubator/mesos/trunk/src/slave/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.cpp?rev=1132242&r1=1132241&r2=1132242&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave/slave.cpp Sun Jun 5 09:06:18 2011
@@ -90,39 +90,42 @@ state::SlaveState *Slave::getState()
new state::SlaveState(BUILD_DATE, BUILD_USER, slaveId.value(),
cpus.value(), mem.value(), self(), master);
- foreachpair(_, Framework *f, frameworks) {
- Resources resources(f->resources);
- Resource::Scalar cpus;
- Resource::Scalar mem;
- cpus.set_value(-1);
- mem.set_value(-1);
- cpus = resources.getScalar("cpus", cpus);
- mem = resources.getScalar("mem", mem);
-
- state::Framework *framework =
- new state::Framework(f->frameworkId.value(), f->info.name(),
- f->info.executor().uri(), f->executorStatus,
- cpus.value(), mem.value());
-
- state->frameworks.push_back(framework);
-
- foreachpair(_, Task *t, f->tasks) {
- Resources resources(t->resources());
- Resource::Scalar cpus;
- Resource::Scalar mem;
- cpus.set_value(-1);
- mem.set_value(-1);
- cpus = resources.getScalar("cpus", cpus);
- mem = resources.getScalar("mem", mem);
-
- state::Task *task =
- new state::Task(t->task_id().value(), t->name(),
- TaskState_descriptor()->FindValueByNumber(t->state())->name(),
- cpus.value(), mem.value());
+// foreachpair (_, Framework *f, frameworks) {
- framework->tasks.push_back(task);
- }
- }
+// foreachpair (_, Executor* e, f->executors) {
+
+// Resources resources(e->resources);
+// Resource::Scalar cpus;
+// Resource::Scalar mem;
+// cpus.set_value(-1);
+// mem.set_value(-1);
+// cpus = resources.getScalar("cpus", cpus);
+// mem = resources.getScalar("mem", mem);
+
+// state::Framework *framework =
+// new state::Framework(f->frameworkId.value(), f->info.name(),
+// f->info.executor().uri(), f->executorStatus,
+// cpus.value(), mem.value());
+
+// state->frameworks.push_back(framework);
+
+// foreachpair(_, Task *t, f->tasks) {
+// Resources resources(t->resources());
+// Resource::Scalar cpus;
+// Resource::Scalar mem;
+// cpus.set_value(-1);
+// mem.set_value(-1);
+// cpus = resources.getScalar("cpus", cpus);
+// mem = resources.getScalar("mem", mem);
+
+// state::Task *task =
+// new state::Task(t->task_id().value(), t->name(),
+// TaskState_descriptor()->FindValueByNumber(t->state())->name(),
+// cpus.value(), mem.value());
+
+// framework->tasks.push_back(task);
+// }
+// }
return state;
}
@@ -181,11 +184,13 @@ void Slave::operator () ()
out.mutable_slave_id()->MergeFrom(slaveId);
out.mutable_slave()->MergeFrom(slave);
- foreachpair(_, Framework *framework, frameworks) {
- foreachpair(_, Task *task, framework->tasks) {
- out.add_task()->MergeFrom(*task);
- }
- }
+ foreachpair (_, Framework* framework, frameworks) {
+ foreachpair (_, Executor* executor, framework->executors) {
+ foreachpair (_, Task* task, executor->tasks) {
+ out.add_task()->MergeFrom(*task);
+ }
+ }
+ }
send(master, out);
}
@@ -241,33 +246,48 @@ void Slave::operator () ()
LOG(INFO) << "Got assigned task " << task.task_id()
<< " for framework " << msg.framework_id();
- // Start an executor if one isn't already running.
Framework *framework = getFramework(msg.framework_id());
if (framework == NULL) {
framework =
- new Framework(msg.framework(), msg.framework_id(), msg.pid());
-
+ new Framework(msg.framework_id(), msg.framework(), msg.pid());
frameworks[msg.framework_id()] = framework;
- isolationModule->startExecutor(framework);
}
- // Create a local task.
- Task *t = framework->addTask(task);
-
- // Either send the task to an executor or queue the task until
- // the executor has started.
- Executor *executor = getExecutor(msg.framework_id());
+ // Either send the task to an executor or start a new executor
+ // and queue the task until the executor has started.
+ Executor* executor = task.has_executor()
+ ? framework->getExecutor(task.executor().executor_id())
+ : framework->getExecutor(framework->info.executor().executor_id());
+
if (executor != NULL) {
- Message<S2E_RUN_TASK> out;
- out.mutable_framework()->MergeFrom(framework->info);
- out.mutable_framework_id()->MergeFrom(framework->frameworkId);
- out.set_pid(framework->pid);
- out.mutable_task()->MergeFrom(task);
- send(executor->pid, out);
- isolationModule->resourcesChanged(framework);
+ if (!executor->pid) {
+ // Queue task until the executor starts up.
+ executor->queuedTasks.push_back(task);
+ } else {
+ // Add the task to the executor.
+ executor->addTask(task);
+
+ Message<S2E_RUN_TASK> out;
+ out.mutable_framework()->MergeFrom(framework->info);
+ out.mutable_framework_id()->MergeFrom(framework->frameworkId);
+ out.set_pid(framework->pid);
+ out.mutable_task()->MergeFrom(task);
+ send(executor->pid, out);
+ isolationModule->resourcesChanged(framework, executor);
+ }
} else {
- // Executor not yet registered; queue task for when it starts up
- framework->queuedTasks.push_back(task);
+ // Launch an executor for this task.
+ if (task.has_executor()) {
+ executor = framework->createExecutor(task.executor());
+ } else {
+ executor = framework->createExecutor(framework->info.executor());
+ }
+
+ // Queue task until the executor starts up.
+ executor->queuedTasks.push_back(task);
+
+ // Tell the isolation module to launch the executor.
+ isolationModule->launchExecutor(framework, executor);
}
break;
}
@@ -280,45 +300,47 @@ void Slave::operator () ()
Framework* framework = getFramework(msg.framework_id());
if (framework != NULL) {
- // Tell the executor to kill the task if it is up and
- // running, if not
- Executor* executor = getExecutor(msg.framework_id());
- if (executor != NULL) {
- Message<S2E_KILL_TASK> out;
- out.mutable_framework_id()->MergeFrom(msg.framework_id());
- out.mutable_task_id()->MergeFrom(msg.task_id());
- send(executor->pid, out);
- } else {
- // Update the resources locally, if an executor comes up
- // after this then it just won't receive this task.
- framework->removeTask(msg.task_id());
- isolationModule->resourcesChanged(framework);
-
- Message<S2M_STATUS_UPDATE> out;
- out.mutable_framework_id()->MergeFrom(msg.framework_id());
- TaskStatus *status = out.mutable_status();
- status->mutable_task_id()->MergeFrom(msg.task_id());
- status->mutable_slave_id()->MergeFrom(slaveId);
- status->set_state(TASK_LOST);
-
- int seq = rsend(master, framework->pid, out);
- seqs[msg.framework_id()].insert(seq);
- }
- } else {
- LOG(ERROR) << "Cannot kill task " << msg.task_id()
- << " of framework " << msg.framework_id()
- << " because no such framework is running";
+ // Tell the executor to kill the task if it is up and
+ // running, otherwise, consider the task lost.
+ Executor* executor = framework->getExecutor(msg.task_id());
+ if (executor == NULL || !executor->pid) {
+ // Update the resources locally, if an executor comes up
+ // after this then it just won't receive this task.
+ executor->removeTask(msg.task_id());
+ isolationModule->resourcesChanged(framework, executor);
+
+ Message<S2M_STATUS_UPDATE> out;
+ out.mutable_framework_id()->MergeFrom(msg.framework_id());
+ TaskStatus *status = out.mutable_status();
+ status->mutable_task_id()->MergeFrom(msg.task_id());
+ status->mutable_slave_id()->MergeFrom(slaveId);
+ status->set_state(TASK_LOST);
+
+ int seq = rsend(master, framework->pid, out);
+ seqs[msg.framework_id()].insert(seq);
+ } else {
+ // Otherwise, send a message to the executor and wait for
+ // it to send us a status update.
+ Message<S2E_KILL_TASK> out;
+ out.mutable_framework_id()->MergeFrom(msg.framework_id());
+ out.mutable_task_id()->MergeFrom(msg.task_id());
+ send(executor->pid, out);
+ }
+ } else {
+ LOG(WARNING) << "Cannot kill task " << msg.task_id()
+ << " of framework " << msg.framework_id()
+ << " because no such framework is running";
Message<S2M_STATUS_UPDATE> out;
- out.mutable_framework_id()->MergeFrom(msg.framework_id());
+ out.mutable_framework_id()->MergeFrom(msg.framework_id());
TaskStatus *status = out.mutable_status();
status->mutable_task_id()->MergeFrom(msg.task_id());
status->mutable_slave_id()->MergeFrom(slaveId);
status->set_state(TASK_LOST);
- int seq = rsend(master, out);
- seqs[msg.framework_id()].insert(seq);
- }
+ int seq = rsend(master, out);
+ seqs[msg.framework_id()].insert(seq);
+ }
break;
}
@@ -336,22 +358,35 @@ void Slave::operator () ()
case M2S_FRAMEWORK_MESSAGE: {
const Message<M2S_FRAMEWORK_MESSAGE>&msg = message();
- const FrameworkMessage& message = msg.message();
+ Framework* framework = getFramework(msg.framework_id());
+ if (framework != NULL) {
+ const FrameworkMessage& message = msg.message();
- Executor* executor = getExecutor(msg.framework_id());
- if (executor != NULL) {
- Message<S2E_FRAMEWORK_MESSAGE> out;
- out.mutable_framework_id()->MergeFrom(msg.framework_id());
- out.mutable_message()->MergeFrom(message);
- send(executor->pid, out);
+ Executor* executor = framework->getExecutor(message.executor_id());
+ if (executor == NULL) {
+ LOG(WARNING) << "Dropping message for executor "
+ << message.executor_id() << " of framework "
+ << msg.framework_id()
+ << " because executor does not exist";
+ } else if (!executor->pid) {
+ // TODO(*): If executor is not started, queue framework message?
+ // (It's probably okay to just drop it since frameworks can have
+ // the executor send a message to the master to say when it's ready.)
+ LOG(WARNING) << "Dropping message for executor "
+ << message.executor_id() << " of framework "
+ << msg.framework_id()
+ << " because executor is not running";
+ } else {
+ Message<S2E_FRAMEWORK_MESSAGE> out;
+ out.mutable_framework_id()->MergeFrom(msg.framework_id());
+ out.mutable_message()->MergeFrom(message);
+ send(executor->pid, out);
+ }
} else {
- VLOG(1) << "Dropping framework message for framework "
- << msg.framework_id()
- << " because its executor is not running";
- }
- // TODO(*): If executor is not started, queue framework message?
- // (It's probably okay to just drop it since frameworks can have
- // the executor send a message to the master to say when it's ready.)
+ LOG(WARNING) << "Dropping message for framework "
+ << msg.framework_id()
+ << " because it does not exist";
+ }
break;
}
@@ -370,37 +405,50 @@ void Slave::operator () ()
case E2S_REGISTER_EXECUTOR: {
const Message<E2S_REGISTER_EXECUTOR>& msg = message();
- LOG(INFO) << "Got executor registration for framework "
+ LOG(INFO) << "Got registration for executor "
+ << msg.executor_id() << " of framework "
<< msg.framework_id();
Framework* framework = getFramework(msg.framework_id());
if (framework != NULL) {
- Executor* executor = getExecutor(msg.framework_id());
- if (executor != NULL) {
- LOG(ERROR) << "Executor for framework " << msg.framework_id()
- << "already exists";
+ Executor* executor = framework->getExecutor(msg.executor_id());
+
+ // Check the status of the executor.
+ if (executor == NULL) {
+ LOG(WARNING) << "Not expecting executor " << msg.executor_id()
+ << " of framework " << msg.framework_id();
+ send(from(), S2E_KILL_EXECUTOR);
+ } else if (executor->pid != PID()) {
+ LOG(WARNING) << "Not good, executor " << msg.executor_id()
+ << " of framework " << msg.framework_id()
+ << " is already running";
send(from(), S2E_KILL_EXECUTOR);
- break;
+ } else {
+ // Save the pid for the executor.
+ executor->pid = from();
+
+ // Now that the executor is up, set its resource limits.
+ isolationModule->resourcesChanged(framework, executor);
+
+ // Tell executor it's registered and give it any queued tasks.
+ Message<S2E_REGISTER_REPLY> out;
+ ExecutorArgs* args = out.mutable_args();
+ args->mutable_framework_id()->MergeFrom(framework->frameworkId);
+ args->set_name(framework->info.name());
+ args->mutable_slave_id()->MergeFrom(slaveId);
+ args->set_hostname(hostname);
+ args->set_data(framework->info.executor().data());
+ send(executor->pid, out);
+ sendQueuedTasks(framework, executor);
}
-
- executor = new Executor(msg.framework_id(), from());
- executors[msg.framework_id()] = executor;
-
- // Now that the executor is up, set its resource limits
- isolationModule->resourcesChanged(framework);
-
- // Tell executor that it's registered and give it its queued tasks
- Message<S2E_REGISTER_REPLY> out;
- ExecutorArgs* args = out.mutable_args();
- args->mutable_framework_id()->MergeFrom(framework->frameworkId);
- args->set_name(framework->info.name());
- args->mutable_slave_id()->MergeFrom(slaveId);
- args->set_hostname(hostname);
- args->set_data(framework->info.executor().data());
- send(executor->pid, out);
- sendQueuedTasks(framework, executor);
} else {
- // Framework is gone; tell the executor to exit
+ // Framework is gone; tell the executor to exit.
+ LOG(WARNING) << "Framework " << msg.framework_id()
+ << " does not exist (it may have been killed),"
+ << " telling executor to exit";
+
+ // TODO(benh): Don't we also want to tell the isolation
+ // module to shut this guy down!
send(from(), S2E_KILL_EXECUTOR);
}
break;
@@ -418,24 +466,30 @@ void Slave::operator () ()
Framework *framework = getFramework(msg.framework_id());
if (framework != NULL) {
- if (status.state() == TASK_FINISHED ||
- status.state() == TASK_FAILED ||
- status.state() == TASK_KILLED ||
- status.state() == TASK_LOST) {
- framework->removeTask(status.task_id());
- isolationModule->resourcesChanged(framework);
+ Executor* executor = framework->getExecutor(status.task_id());
+ if (executor != NULL) {
+ if (status.state() == TASK_FINISHED ||
+ status.state() == TASK_FAILED ||
+ status.state() == TASK_KILLED ||
+ status.state() == TASK_LOST) {
+ executor->removeTask(status.task_id());
+ isolationModule->resourcesChanged(framework, executor);
+ }
+
+ // Reliably send message and save sequence number for
+ // canceling later.
+ Message<S2M_STATUS_UPDATE> out;
+ out.mutable_framework_id()->MergeFrom(msg.framework_id());
+ out.mutable_status()->MergeFrom(status);
+ int seq = rsend(master, framework->pid, out);
+ seqs[msg.framework_id()].insert(seq);
+ } else {
+ LOG(WARNING) << "Status update error: couldn't lookup "
+ << "executor for framework " << msg.framework_id();
}
-
- // Reliably send message and save sequence number for
- // canceling later.
- Message<S2M_STATUS_UPDATE> out;
- out.mutable_framework_id()->MergeFrom(msg.framework_id());
- out.mutable_status()->MergeFrom(status);
- int seq = rsend(master, framework->pid, out);
- seqs[msg.framework_id()].insert(seq);
} else {
- LOG(ERROR) << "Status update error: couldn't lookup "
- << "framework " << msg.framework_id();
+ LOG(WARNING) << "Status update error: couldn't lookup "
+ << "framework " << msg.framework_id();
}
break;
}
@@ -482,8 +536,7 @@ void Slave::operator () ()
case M2S_SHUTDOWN: {
LOG(INFO) << "Asked to shut down by master: " << from();
- unordered_map<FrameworkID, Framework*> frameworksCopy = frameworks;
- foreachpair (_, Framework *framework, frameworksCopy) {
+ foreachpaircopy (_, Framework *framework, frameworks) {
killFramework(framework);
}
return;
@@ -491,8 +544,7 @@ void Slave::operator () ()
case S2S_SHUTDOWN: {
LOG(INFO) << "Asked to shut down by " << from();
- unordered_map<FrameworkID, Framework*> frameworksCopy = frameworks;
- foreachpair (_, Framework *framework, frameworksCopy) {
+ foreachpaircopy (_, Framework *framework, frameworks) {
killFramework(framework);
}
return;
@@ -508,21 +560,13 @@ void Slave::operator () ()
}
-Framework * Slave::getFramework(const FrameworkID& frameworkId)
+Framework* Slave::getFramework(const FrameworkID& frameworkId)
{
- if (frameworks.count(frameworkId) > 0)
+ if (frameworks.count(frameworkId) > 0) {
return frameworks[frameworkId];
- else
- return NULL;
-}
-
+ }
-Executor * Slave::getExecutor(const FrameworkID& frameworkId)
-{
- if (executors.count(frameworkId) > 0)
- return executors[frameworkId];
- else
- return NULL;
+ return NULL;
}
@@ -533,7 +577,12 @@ void Slave::sendQueuedTasks(Framework* f
LOG(INFO) << "Flushing queued tasks for framework "
<< framework->frameworkId;
- foreach(const TaskDescription& task, framework->queuedTasks) {
+ CHECK(executor->pid != PID());
+
+ foreach (const TaskDescription& task, executor->queuedTasks) {
+ // Add the task to the executor.
+ executor->addTask(task);
+
Message<S2E_RUN_TASK> out;
out.mutable_framework()->MergeFrom(framework->info);
out.mutable_framework_id()->MergeFrom(framework->frameworkId);
@@ -542,12 +591,12 @@ void Slave::sendQueuedTasks(Framework* f
send(executor->pid, out);
}
- framework->queuedTasks.clear();
+ executor->queuedTasks.clear();
}
// Kill a framework (including its executor if killExecutor is true).
-void Slave::killFramework(Framework *framework, bool killExecutor)
+void Slave::killFramework(Framework *framework, bool killExecutors)
{
LOG(INFO) << "Cleaning up framework " << framework->frameworkId;
@@ -558,29 +607,23 @@ void Slave::killFramework(Framework *fra
seqs.erase(framework->frameworkId);
- // Remove its allocated resources.
- framework->resources = Resources();
+ // Shutdown all executors of this framework.
+ foreachpaircopy (const ExecutorID& executorId, Executor* executor, framework->executors) {
+ if (killExecutors) {
+ LOG(INFO) << "Killing executor " << executorId
+ << " of framework " << framework->frameworkId;
- // If an executor is running, tell it to exit and kill it.
+ send(executor->pid, S2E_KILL_EXECUTOR);
- Executor *executor = getExecutor(framework->frameworkId);
- if (executor != NULL) {
- if (killExecutor) {
- LOG(INFO) << "Killing executor for framework "
- << framework->frameworkId;
// TODO(benh): There really isn't ANY time between when an
// executor gets a S2E_KILL_EXECUTOR message and the isolation
// module goes and kills it. We should really think about making
// the semantics of this better.
- send(executor->pid, S2E_KILL_EXECUTOR);
- isolationModule->killExecutor(framework);
- }
- LOG(INFO) << "Cleaning up executor for framework "
- << framework->frameworkId;
+ isolationModule->killExecutor(framework, executor);
+ }
- executors.erase(framework->frameworkId);
- delete executor;
+ framework->destroyExecutor(executorId);
}
frameworks.erase(framework->frameworkId);
@@ -591,20 +634,39 @@ void Slave::killFramework(Framework *fra
// Called by isolation module when an executor process exits
// TODO(benh): Make this callback be a message so that we can avoid
// race conditions.
-void Slave::executorExited(const FrameworkID& frameworkId, int status)
+void Slave::executorExited(const FrameworkID& frameworkId, const ExecutorID& executorId, int status)
{
Framework* framework = getFramework(frameworkId);
if (framework != NULL) {
- LOG(INFO) << "Executor for framework " << frameworkId << " exited "
- << "with status " << status;
-
- Message<S2M_EXITED_EXECUTOR> out;
- out.mutable_slave_id()->MergeFrom(slaveId);
- out.mutable_framework_id()->MergeFrom(frameworkId);
- out.set_status(status);
- send(master, out);
-
- killFramework(framework, false);
+ Executor* executor = framework->getExecutor(executorId);
+ if (executor != NULL) {
+ LOG(INFO) << "Exited executor " << executorId
+ << " of framework " << frameworkId
+ << " with status " << status;
+
+ Message<S2M_EXITED_EXECUTOR> out;
+ out.mutable_slave_id()->MergeFrom(slaveId);
+ out.mutable_framework_id()->MergeFrom(frameworkId);
+ out.mutable_executor_id()->MergeFrom(executorId);
+ out.set_status(status);
+ send(master, out);
+
+ framework->destroyExecutor(executorId);
+
+ // TODO(benh): When should we kill the presence of an entire
+ // framework on a slave?
+ if (framework->executors.size() == 0) {
+ killFramework(framework);
+ }
+ } else {
+ LOG(WARNING) << "UNKNOWN executor " << executorId
+ << " of framework " << frameworkId
+ << " has exited with status " << status;
+ }
+ } else {
+ LOG(WARNING) << "UNKNOWN executor " << executorId
+ << " of UNKNOWN framework " << frameworkId
+ << " has exited with status " << status;
}
};
Modified: incubator/mesos/trunk/src/slave/slave.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.hpp?rev=1132242&r1=1132241&r2=1132242&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.hpp (original)
+++ incubator/mesos/trunk/src/slave/slave.hpp Sun Jun 5 09:06:18 2011
@@ -45,44 +45,16 @@
namespace mesos { namespace internal { namespace slave {
-using namespace mesos;
-using namespace mesos::internal;
-
-using std::list;
-using std::pair;
-using std::make_pair;
-using std::ostringstream;
-using std::string;
-using std::vector;
-
-using boost::lexical_cast;
-using boost::unordered_map;
-using boost::unordered_set;
-
using foreach::_;
-// Information about a framework
-struct Framework
+// Information describing an executor (goes away if executor crashes).
+struct Executor
{
- FrameworkInfo info;
- FrameworkID frameworkId;
- PID pid;
-
- list<TaskDescription> queuedTasks; // Holds tasks until executor starts
- unordered_map<TaskID, Task *> tasks;
-
- Resources resources;
+ Executor(const FrameworkID& _frameworkId, const ExecutorInfo& _info)
+ : frameworkId(_frameworkId), info(_info), pid(PID()) {}
- // Information about the status of the executor for this framework, set by
- // the isolation module. For example, this might include a PID, a VM ID, etc.
- string executorStatus;
-
- Framework(const FrameworkInfo& _info, const FrameworkID& _frameworkId,
- const PID& _pid)
- : info(_info), frameworkId(_frameworkId), pid(_pid) {}
-
- ~Framework()
+ ~Executor()
{
// Delete the tasks.
foreachpair (_, Task *task, tasks) {
@@ -90,39 +62,28 @@ struct Framework
}
}
- Task * lookupTask(const TaskID& taskId)
- {
- unordered_map<TaskID, Task *>::iterator it = tasks.find(taskId);
- if (it != tasks.end())
- return it->second;
- else
- return NULL;
- }
-
- Task * addTask(const TaskDescription& task)
+ Task* addTask(const TaskDescription& task)
{
// The master should enforce unique task IDs, but just in case
// maybe we shouldn't make this a fatal error.
CHECK(tasks.count(task.task_id()) == 0);
Task *t = new Task();
+ t->mutable_framework_id()->MergeFrom(frameworkId);
+ t->set_state(TASK_STARTING);
t->set_name(task.name());
t->mutable_task_id()->MergeFrom(task.task_id());
- t->mutable_framework_id()->MergeFrom(frameworkId);
t->mutable_slave_id()->MergeFrom(task.slave_id());
t->mutable_resources()->MergeFrom(task.resources());
- t->set_state(TASK_STARTING);
tasks[task.task_id()] = t;
resources += task.resources();
-
- return t;
}
void removeTask(const TaskID& taskId)
{
// Remove task from the queue if it's queued
- for (list<TaskDescription>::iterator it = queuedTasks.begin();
+ for (std::list<TaskDescription>::iterator it = queuedTasks.begin();
it != queuedTasks.end(); ++it) {
if ((*it).task_id() == taskId) {
queuedTasks.erase(it);
@@ -140,17 +101,75 @@ struct Framework
delete task;
}
}
+
+ const FrameworkID frameworkId;
+ const ExecutorInfo info;
+
+ PID pid;
+
+ std::list<TaskDescription> queuedTasks;
+ boost::unordered_map<TaskID, Task*> tasks;
+
+ Resources resources;
+
+ // Information about the status of the executor for this framework, set by
+ // the isolation module. For example, this might include a PID, a VM ID, etc.
+ std::string executorStatus;
};
-// A connection to an executor (goes away if executor crashes)
-struct Executor
+// Information about a framework.
+struct Framework
{
- FrameworkID frameworkId;
+ Framework( const FrameworkID& _frameworkId, const FrameworkInfo& _info,
+ const PID& _pid)
+ : frameworkId(_frameworkId), info(_info), pid(_pid) {}
+
+ ~Framework() {}
+
+ Executor* createExecutor(const ExecutorInfo& info)
+ {
+ Executor* executor = new Executor(frameworkId, info);
+ CHECK(executors.count(info.executor_id()) == 0);
+ executors[info.executor_id()] = executor;
+ return executor;
+ }
+
+ void destroyExecutor(const ExecutorID& executorId)
+ {
+ if (executors.count(executorId) > 0) {
+ Executor* executor = executors[executorId];
+ executors.erase(executorId);
+ delete executor;
+ }
+ }
+
+ Executor* getExecutor(const ExecutorID& executorId)
+ {
+ if (executors.count(executorId) > 0) {
+ return executors[executorId];
+ }
+
+ return NULL;
+ }
+
+ Executor* getExecutor(const TaskID& taskId)
+ {
+ foreachpair (_, Executor* executor, executors) {
+ if (executor->tasks.count(taskId) > 0) {
+ return executor;
+ }
+ }
+
+ return NULL;
+ }
+
+ const FrameworkID frameworkId;
+ const FrameworkInfo info;
+
PID pid;
-
- Executor(const FrameworkID& _frameworkId, const PID& _pid)
- : frameworkId(_frameworkId), pid(_pid) {}
+
+ boost::unordered_map<ExecutorID, Executor*> executors;
};
@@ -206,12 +225,12 @@ public:
state::SlaveState *getState();
// Callback used by isolation module to tell us when an executor exits.
- void executorExited(const FrameworkID& frameworkId, int status);
+ void executorExited(const FrameworkID& frameworkId, const ExecutorID& executorId, int status);
// Kill a framework (possibly killing its executor).
- void killFramework(Framework *framework, bool killExecutor = true);
+ void killFramework(Framework *framework, bool killExecutors = true);
- string getUniqueWorkDirectory(const FrameworkID& frameworkId);
+ std::string getUniqueWorkDirectory(const FrameworkID& frameworkId);
const Configuration& getConfiguration();
@@ -223,9 +242,7 @@ public:
protected:
virtual void operator () ();
- Framework * getFramework(const FrameworkID& frameworkId);
-
- Executor * getExecutor(const FrameworkID& frameworkId);
+ Framework* getFramework(const FrameworkID& frameworkId);
// Send any tasks queued up for the given framework to its executor
// (needed if we received tasks while the executor was starting up).
@@ -238,14 +255,13 @@ private:
Resources resources;
// Invariant: framework will exist if executor exists.
- unordered_map<FrameworkID, Framework*> frameworks;
- unordered_map<FrameworkID, Executor*> executors;
+ boost::unordered_map<FrameworkID, Framework*> frameworks;
+
+ // Sequence numbers of reliable messages sent per framework.
+ boost::unordered_map<FrameworkID, boost::unordered_set<int> > seqs;
IsolationModule *isolationModule;
Heart* heart;
-
- // Sequence numbers of reliable messages sent on behalf of framework.
- unordered_map<FrameworkID, unordered_set<int> > seqs;
};
}}}
Modified: incubator/mesos/trunk/src/tests/master_test.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/master_test.cpp?rev=1132242&r1=1132241&r2=1132242&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/master_test.cpp (original)
+++ incubator/mesos/trunk/src/tests/master_test.cpp Sun Jun 5 09:06:18 2011
@@ -24,8 +24,6 @@ using boost::lexical_cast;
using mesos::internal::master::Master;
using mesos::internal::slave::Slave;
-using mesos::internal::slave::Framework;
-using mesos::internal::slave::IsolationModule;
using mesos::internal::slave::ProcessBasedIsolationModule;
using std::string;
@@ -46,7 +44,7 @@ using testing::Sequence;
using testing::StrEq;
-class LocalIsolationModule : public IsolationModule
+class LocalIsolationModule : public slave::IsolationModule
{
public:
Executor* executor;
@@ -58,21 +56,22 @@ public:
virtual ~LocalIsolationModule() {}
- virtual void initialize(Slave* slave) {
+ virtual void initialize(slave::Slave* slave) {
pid = slave->self();
}
- virtual void startExecutor(Framework* framework) {
+ virtual void launchExecutor(slave::Framework* f, slave::Executor* e) {
// TODO(benh): Cleanup the way we launch local drivers!
setenv("MESOS_LOCAL", "1", 1);
setenv("MESOS_SLAVE_PID", pid.c_str(), 1);
- setenv("MESOS_FRAMEWORK_ID", framework->frameworkId.value().c_str(), 1);
+ setenv("MESOS_FRAMEWORK_ID", f->frameworkId.value().c_str(), 1);
+ setenv("MESOS_EXECUTOR_ID", e->info.executor_id().value().c_str(), 1);
driver = new MesosExecutorDriver(executor);
driver->start();
}
- virtual void killExecutor(Framework* framework) {
+ virtual void killExecutor(slave::Framework* f, slave::Executor* e) {
driver->stop();
driver->join();
delete driver;
@@ -81,6 +80,7 @@ public:
unsetenv("MESOS_LOCAL");
unsetenv("MESOS_SLAVE_PID");
unsetenv("MESOS_FRAMEWORK_ID");
+ unsetenv("MESOS_EXECUTOR_ID");
}
};
@@ -969,6 +969,7 @@ TEST(MasterTest, FrameworkMessage)
FrameworkMessage hello;
*hello.mutable_slave_id() = offers[0].slave_id();
+ hello.mutable_executor_id()->set_value("default"); // TODO(benh): No constant!
hello.set_data("hello");
schedDriver.sendFrameworkMessage(hello);
@@ -979,6 +980,7 @@ TEST(MasterTest, FrameworkMessage)
FrameworkMessage reply;
*reply.mutable_slave_id() = args.slave_id();
+ reply.mutable_executor_id()->set_value("default"); // TODO(benh): No constant!
reply.set_data("reply");
execDriver->sendFrameworkMessage(reply);
@@ -1113,6 +1115,7 @@ TEST(MasterTest, SchedulerFailoverFramew
FrameworkMessage message;
message.mutable_slave_id()->MergeFrom(offers[0].slave_id());
+ message.mutable_executor_id()->set_value("default"); // TODO(benh): No constant!
execDriver->sendFrameworkMessage(message);
Modified: incubator/mesos/trunk/src/tests/offer_reply_errors_test.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/offer_reply_errors_test.cpp?rev=1132242&r1=1132241&r2=1132242&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/offer_reply_errors_test.cpp (original)
+++ incubator/mesos/trunk/src/tests/offer_reply_errors_test.cpp Sun Jun 5 09:06:18 2011
@@ -56,6 +56,7 @@ public:
// TODO(benh): The following line crashes some Linux compilers. :(
// return DEFAULT_EXECUTOR_INFO;
ExecutorInfo executor;
+ executor.mutable_executor_id()->set_value("default");
executor.set_uri("noexecutor");
return executor;
}
Modified: incubator/mesos/trunk/src/tests/utils.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/utils.hpp?rev=1132242&r1=1132241&r2=1132242&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/utils.hpp (original)
+++ incubator/mesos/trunk/src/tests/utils.hpp Sun Jun 5 09:06:18 2011
@@ -45,8 +45,11 @@ void enterTestDirectory(const char* test
/**
* Macro to get a "default" dummy ExecutorInfo object for testing.
*/
-#define DEFAULT_EXECUTOR_INFO \
- ({ ExecutorInfo executor; executor.set_uri("noexecutor"); executor; })
+#define DEFAULT_EXECUTOR_INFO \
+ ({ ExecutorInfo executor; \
+ executor.mutable_executor_id()->set_value("default"); \
+ executor.set_uri("noexecutor"); \
+ executor; })
/**