You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 05:22:01 UTC
svn commit: r1131554 - in /incubator/mesos/trunk:
frameworks/hadoop-0.20.0/src/mapred/org/apache/hadoop/mapred/
frameworks/haproxy+apache/ frameworks/mpi/ include/ src/ src/swig/
src/swig/java/ src/swig/python/
Author: benh
Date: Sun Jun 5 03:22:00 2011
New Revision: 1131554
URL: http://svn.apache.org/viewvc?rev=1131554&view=rev
Log:
Updated frameworks to use driver API. Also implemented proper
referencing of executor in Java and Python.
Modified:
incubator/mesos/trunk/frameworks/hadoop-0.20.0/src/mapred/org/apache/hadoop/mapred/NexusExecutor.java
incubator/mesos/trunk/frameworks/hadoop-0.20.0/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
incubator/mesos/trunk/frameworks/haproxy+apache/startapache.py
incubator/mesos/trunk/frameworks/mpi/startmpd.py
incubator/mesos/trunk/include/nexus_exec.hpp
incubator/mesos/trunk/include/nexus_sched.hpp
incubator/mesos/trunk/src/cpp_test_executor.cpp
incubator/mesos/trunk/src/memhog_executor.cpp
incubator/mesos/trunk/src/swig/java/TestExecutor.java
incubator/mesos/trunk/src/swig/java/TestFramework.java
incubator/mesos/trunk/src/swig/java/test_framework
incubator/mesos/trunk/src/swig/nexus.i
incubator/mesos/trunk/src/swig/python/test_exec.py
Modified: incubator/mesos/trunk/frameworks/hadoop-0.20.0/src/mapred/org/apache/hadoop/mapred/NexusExecutor.java
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/frameworks/hadoop-0.20.0/src/mapred/org/apache/hadoop/mapred/NexusExecutor.java?rev=1131554&r1=1131553&r2=1131554&view=diff
==============================================================================
--- incubator/mesos/trunk/frameworks/hadoop-0.20.0/src/mapred/org/apache/hadoop/mapred/NexusExecutor.java (original)
+++ incubator/mesos/trunk/frameworks/hadoop-0.20.0/src/mapred/org/apache/hadoop/mapred/NexusExecutor.java Sun Jun 5 03:22:00 2011
@@ -21,7 +21,9 @@ import org.apache.hadoop.mapred.TaskStat
import nexus.Executor;
import nexus.ExecutorArgs;
+import nexus.ExecutorDriver;
import nexus.FrameworkMessage;
+import nexus.NexusExecutorDriver;
import nexus.TaskDescription;
import nexus.TaskState;
@@ -33,6 +35,7 @@ public class NexusExecutor extends Execu
private JobConf conf;
private TaskTracker taskTracker;
+ private ExecutorDriver driver;
private int slaveId;
private AtomicInteger nextRpcId = new AtomicInteger();
@@ -69,8 +72,9 @@ public class NexusExecutor extends Execu
}
@Override
- public void init(ExecutorArgs args) {
+ public void init(ExecutorDriver driver, ExecutorArgs args) {
try {
+ this.driver = driver;
slaveId = args.getSlaveId();
conf = new JobConf();
conf.set("mapred.job.tracker", new String(args.getData()));
@@ -86,7 +90,7 @@ public class NexusExecutor extends Execu
System.exit(1);
}
}
-
+
/**
* Remove expired unmapped tasks. We call this right before sending out every
* heartbeat to ensure that we never kill an unmapped task that we'll
@@ -123,12 +127,12 @@ public class NexusExecutor extends Execu
}
}
for (nexus.TaskStatus update: updates) {
- sendStatusUpdate(update);
+ driver.sendStatusUpdate(update);
}
}
@Override
- public void killTask(int taskId) {
+ public void killTask(ExecutorDriver d, int taskId) {
synchronized (this) {
TaskAttemptID hadoopId = nexusIdToHadoopId.get(taskId);
taskTracker.killTask(hadoopId);
@@ -136,7 +140,7 @@ public class NexusExecutor extends Execu
}
@Override
- public void startTask(TaskDescription taskDesc) {
+ public void startTask(ExecutorDriver d, TaskDescription taskDesc) {
String taskType = new String(taskDesc.getArg());
LOG.info("start_task " + taskDesc.getTaskId() + ": " + taskType);
if (taskType.equals("map")) {
@@ -155,7 +159,7 @@ public class NexusExecutor extends Execu
}
@Override
- public void frameworkMessage(FrameworkMessage message) {
+ public void frameworkMessage(ExecutorDriver d, FrameworkMessage message) {
try {
//LOG.info("Got RPC response of size " + message.getData().length);
ObjectWritable writable = new ObjectWritable();
@@ -227,7 +231,7 @@ public class NexusExecutor extends Execu
byte[] bytes = bos.toByteArray();
//LOG.info("RPC message length: " + bytes.length);
FrameworkMessage msg = new FrameworkMessage(0, 0, bytes);
- sendFrameworkMessage(msg);
+ driver.sendFrameworkMessage(msg);
// Wait for a reply
synchronized(response) {
@@ -305,7 +309,7 @@ public class NexusExecutor extends Execu
}
for (nexus.TaskStatus update: updates) {
if (update != null) {
- sendStatusUpdate(update);
+ driver.sendStatusUpdate(update);
}
}
@@ -387,7 +391,8 @@ public class NexusExecutor extends Execu
}
}
- public synchronized nexus.TaskStatus createTaskDoneUpdate(TaskStatus taskStatus) {
+ public synchronized nexus.TaskStatus createTaskDoneUpdate(
+ TaskStatus taskStatus) {
TaskAttemptID hadoopId = taskStatus.getTaskID();
if (hadoopIdToNexusId.containsKey(hadoopId)) {
Integer nexusId = hadoopIdToNexusId.get(hadoopId);
@@ -423,9 +428,13 @@ public class NexusExecutor extends Execu
}
return null;
}
+
+ public ExecutorDriver getDriver() {
+ return driver;
+ }
public static void main(String[] args) throws Exception {
System.loadLibrary("nexus");
- new NexusExecutor().run();
+ new NexusExecutorDriver(new NexusExecutor()).run();
}
}
Modified: incubator/mesos/trunk/frameworks/hadoop-0.20.0/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/frameworks/hadoop-0.20.0/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1131554&r1=1131553&r2=1131554&view=diff
==============================================================================
--- incubator/mesos/trunk/frameworks/hadoop-0.20.0/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ incubator/mesos/trunk/frameworks/hadoop-0.20.0/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Sun Jun 5 03:22:00 2011
@@ -2096,7 +2096,7 @@ public class TaskTracker
// Report the task as done to Nexus so that it can report our slot
// as finished and get a new slot assigned before the next heartbeat
nexus.TaskStatus s = nexusExecutor.createTaskDoneUpdate(taskStatus);
- if (s != null) nexusExecutor.sendStatusUpdate(s);
+ if (s != null) nexusExecutor.getDriver().sendStatusUpdate(s);
}
}
@@ -2157,7 +2157,7 @@ public class TaskTracker
// Report the task as done to Nexus so that it can report our slot
// as finished and get a new slot assigned before the next heartbeat
nexus.TaskStatus s = nexusExecutor.createTaskDoneUpdate(taskStatus);
- if (s != null) nexusExecutor.sendStatusUpdate(s);
+ if (s != null) nexusExecutor.getDriver().sendStatusUpdate(s);
}
}
Modified: incubator/mesos/trunk/frameworks/haproxy+apache/startapache.py
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/frameworks/haproxy%2Bapache/startapache.py?rev=1131554&r1=1131553&r2=1131554&view=diff
==============================================================================
--- incubator/mesos/trunk/frameworks/haproxy+apache/startapache.py (original)
+++ incubator/mesos/trunk/frameworks/haproxy+apache/startapache.py Sun Jun 5 03:22:00 2011
@@ -20,18 +20,18 @@ class MyExecutor(nexus.Executor):
nexus.Executor.__init__(self)
self.tid = -1
- def startTask(self, task):
+ def startTask(self, driver, task):
self.tid = task.taskId
Popen("/usr/apache2/2.2/bin/apachectl start", shell=True)
- def killTask(self, tid):
+ def killTask(self, driver, tid):
if (tid != self.tid):
print "Expecting different task id ... killing anyway!"
cleanup()
update = nexus.TaskStatus(tid, nexus.TASK_FINISHED, "")
- self.sendStatusUpdate(update)
+ driver.sendStatusUpdate(update)
- def shutdown(self):
+ def shutdown(driver, self):
cleanup()
def error(self, code, message):
@@ -41,4 +41,4 @@ if __name__ == "__main__":
print "Starting haproxy+apache executor"
atexit.register(cleanup)
executor = MyExecutor()
- executor.run()
+ nexus.NexusExecutorDriver(executor).run()
Modified: incubator/mesos/trunk/frameworks/mpi/startmpd.py
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/frameworks/mpi/startmpd.py?rev=1131554&r1=1131553&r2=1131554&view=diff
==============================================================================
--- incubator/mesos/trunk/frameworks/mpi/startmpd.py (original)
+++ incubator/mesos/trunk/frameworks/mpi/startmpd.py Sun Jun 5 03:22:00 2011
@@ -20,28 +20,28 @@ class MyExecutor(nexus.Executor):
def __init__(self):
nexus.Executor.__init__(self)
- def init(self, arg):
+ def init(self, driver, arg):
[ip,port] = arg.data.split(":")
self.ip = ip
self.port = port
- def startTask(self, task):
+ def startTask(self, driver, task):
print "Running task %d" % task.taskId
Popen("mpd -n -h "+self.ip+" -p "+self.port, shell=True)
- def killTask(self, tid):
+ def killTask(self, driver, tid):
# TODO(*): Kill only one of the mpd's!
sys.exit(1)
- def shutdown(self):
+ def shutdown(self, driver):
print "shutdown"
cleanup()
- def error(self, code, message):
+ def error(self, driver, code, message):
print "Error: %s" % message
if __name__ == "__main__":
print "Starting executor"
atexit.register(cleanup)
executor = MyExecutor()
- executor.run()
+ nexus.NexusExecutorDriver(executor).run()
Modified: incubator/mesos/trunk/include/nexus_exec.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/include/nexus_exec.hpp?rev=1131554&r1=1131553&r2=1131554&view=diff
==============================================================================
--- incubator/mesos/trunk/include/nexus_exec.hpp (original)
+++ incubator/mesos/trunk/include/nexus_exec.hpp Sun Jun 5 03:22:00 2011
@@ -88,6 +88,9 @@ public:
virtual void sendStatusUpdate(const TaskStatus& status);
virtual void sendFrameworkMessage(const FrameworkMessage& message);
+ // Executor getter; required by some of the SWIG proxies
+ virtual Executor* getExecutor() { return executor; }
+
private:
friend class internal::ExecutorProcess;
Modified: incubator/mesos/trunk/include/nexus_sched.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/include/nexus_sched.hpp?rev=1131554&r1=1131553&r2=1131554&view=diff
==============================================================================
--- incubator/mesos/trunk/include/nexus_sched.hpp (original)
+++ incubator/mesos/trunk/include/nexus_sched.hpp Sun Jun 5 03:22:00 2011
@@ -94,11 +94,12 @@ public:
const string_map& params);
virtual void reviveOffers();
virtual void sendHints(const string_map& hints);
-
- // Scheduler getter; mostly used in SWIG proxies
+
+ // Scheduler getter; required by some of the SWIG proxies
virtual Scheduler* getScheduler() { return sched; }
private:
+ // Internal utility method to report an error to the scheduler
void error(int code, const std::string& message);
std::string master;
Modified: incubator/mesos/trunk/src/cpp_test_executor.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/cpp_test_executor.cpp?rev=1131554&r1=1131553&r2=1131554&view=diff
==============================================================================
--- incubator/mesos/trunk/src/cpp_test_executor.cpp (original)
+++ incubator/mesos/trunk/src/cpp_test_executor.cpp Sun Jun 5 03:22:00 2011
@@ -12,22 +12,23 @@ class MyExecutor : public Executor
public:
virtual ~MyExecutor() {}
- virtual void init(const ExecutorArgs& args) {
+ virtual void init(ExecutorDriver*, const ExecutorArgs& args) {
cout << "Init" << endl;
}
- virtual void startTask(const TaskDescription& task) {
+ virtual void startTask(ExecutorDriver* d, const TaskDescription& task) {
cout << "Starting task " << task.taskId << endl;
sleep(1);
cout << "Finishing task " << task.taskId << endl;
TaskStatus status(task.taskId, TASK_FINISHED, "");
- sendStatusUpdate(status);
+ d->sendStatusUpdate(status);
}
};
-int main(int argc, char ** argv) {
+int main(int argc, char** argv) {
MyExecutor exec;
- exec.run();
+ NexusExecutorDriver driver(&exec);
+ driver.run();
return 0;
}
Modified: incubator/mesos/trunk/src/memhog_executor.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/memhog_executor.cpp?rev=1131554&r1=1131553&r2=1131554&view=diff
==============================================================================
--- incubator/mesos/trunk/src/memhog_executor.cpp (original)
+++ incubator/mesos/trunk/src/memhog_executor.cpp Sun Jun 5 03:22:00 2011
@@ -38,9 +38,12 @@ public:
double taskLen;
int64_t memToHog;
int threadsPerTask;
+ ExecutorDriver* driver;
+
virtual ~MemHogExecutor() {}
- virtual void init(const ExecutorArgs &args) {
+ virtual void init(ExecutorDriver* driver, const ExecutorArgs &args) {
+ this->driver = driver;
istringstream in(args.data);
in >> memToHog >> taskLen >> threadsPerTask;
cout << "Initialized: memToHog = " << memToHog
@@ -48,7 +51,7 @@ public:
<< ", threadsPerTask = " << threadsPerTask << endl;
}
- virtual void startTask(const TaskDescription& task) {
+ virtual void startTask(ExecutorDriver*, const TaskDescription& task) {
cout << "Executor starting task " << task.taskId << endl;
for (int i = 0; i < threadsPerTask; i++) {
ThreadArg *arg = new ThreadArg(this, task.taskId, i == 0);
@@ -66,7 +69,7 @@ void *runTask(void *arg)
MemHogExecutor *executor = threadArg->executor;
int64_t memToHog = executor->memToHog;
double taskLen = executor->taskLen;
- cout << "Running a task..." << endl;
+ cout << "Running a worker thread..." << endl;
char *data = new char[memToHog];
int32_t count = 0;
time_t start = time(0);
@@ -82,7 +85,7 @@ void *runTask(void *arg)
if (threadArg->primary) {
usleep(100000); // sleep 0.1 seconds
TaskStatus status(threadArg->tid, TASK_FINISHED, "");
- executor->sendStatusUpdate(status);
+ executor->driver->sendStatusUpdate(status);
}
return 0;
}
@@ -94,6 +97,7 @@ void *runTask(void *arg)
int main(int argc, char ** argv) {
MemHogExecutor exec;
- exec.run();
+ NexusExecutorDriver driver(&exec);
+ driver.run();
return 0;
}
Modified: incubator/mesos/trunk/src/swig/java/TestExecutor.java
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/swig/java/TestExecutor.java?rev=1131554&r1=1131553&r2=1131554&view=diff
==============================================================================
--- incubator/mesos/trunk/src/swig/java/TestExecutor.java (original)
+++ incubator/mesos/trunk/src/swig/java/TestExecutor.java Sun Jun 5 03:22:00 2011
@@ -7,18 +7,23 @@ public class TestExecutor extends Execut
}
@Override
- public void startTask(final TaskDescription task) {
+ public void startTask(final ExecutorDriver d, final TaskDescription task) {
new Thread() { public void run() {
+ try {
System.out.println("Running task " + task.getTaskId());
- try { Thread.sleep(1000); } catch(Exception e) {}
- sendStatusUpdate(new TaskStatus(task.getTaskId(),
- TaskState.TASK_FINISHED,
- new byte[0]));
+ Thread.sleep(1000);
+ d.sendStatusUpdate(new TaskStatus(task.getTaskId(),
+ TaskState.TASK_FINISHED,
+ new byte[0]));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
}}.start();
}
public static void main(String[] args) throws Exception {
TestExecutor exec = new TestExecutor();
- exec.run();
+ ExecutorDriver driver = new NexusExecutorDriver(exec);
+ driver.run();
}
}
Modified: incubator/mesos/trunk/src/swig/java/TestFramework.java
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/swig/java/TestFramework.java?rev=1131554&r1=1131553&r2=1131554&view=diff
==============================================================================
--- incubator/mesos/trunk/src/swig/java/TestFramework.java (original)
+++ incubator/mesos/trunk/src/swig/java/TestFramework.java Sun Jun 5 03:22:00 2011
@@ -30,13 +30,13 @@ public class TestFramework {
}
@Override
- public void registered(SchedulerDriver d, String fid) {
+ public void registered(SchedulerDriver d, int fid) {
System.out.println("Registered! FID = " + fid);
}
@Override
public void resourceOffer(SchedulerDriver d,
- String oid,
+ long oid,
SlaveOfferVector offers) {
System.out.println("Got offer offer " + oid);
TaskDescriptionVector tasks = new TaskDescriptionVector();
@@ -47,12 +47,11 @@ public class TestFramework {
taskParams.set("cpus", "1");
taskParams.set("mem", "134217728");
System.out.println("Launching task " + taskId);
- tasks.add(new TaskDescription(launchedTasks,
+ tasks.add(new TaskDescription(taskId,
offer.getSlaveId(),
"task " + taskId,
taskParams,
new byte[0]));
- launchedTasks++;
}
StringMap params = new StringMap();
params.set("timeout", "1");
@@ -60,23 +59,28 @@ public class TestFramework {
}
@Override
+ public void statusUpdate(SchedulerDriver d, TaskStatus status) {
+ System.out.println("Status update: task " + status.getTaskId() +
+ " is in state " + status.getState());
+ if (status.getState() == TaskState.TASK_FINISHED) {
+ finishedTasks++;
+ System.out.println("Finished tasks: " + finishedTasks);
+ if (finishedTasks == totalTasks)
+ d.stop();
+ }
+ }
+
+ @Override
public void error(SchedulerDriver d, int code, String message) {
System.out.println("Error: " + message);
}
}
public static void main(String[] args) throws Exception {
- //MyScheduler sched = new MyScheduler();
- //NexusSchedulerDriver driver = new NexusSchedulerDriver(sched, args[0]);
- //driver.run();
new NexusSchedulerDriver(new MyScheduler(), args[0]).run();
- new Thread() {
- public void run() {
- while(true) {
- System.gc();
- try {Thread.sleep(1000);} catch(Exception e) {}
- }
- }
- }.start();
+ // TODO: Java should just exit here, and it does so on Linux, but
+ // it doesn't on OS X. We should figure out why. It may have to do
+ // with the libprocess threads being around and not being marked
+ // as daemon threads by the JVM.
}
}
Modified: incubator/mesos/trunk/src/swig/java/test_framework
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/swig/java/test_framework?rev=1131554&r1=1131553&r2=1131554&view=diff
==============================================================================
--- incubator/mesos/trunk/src/swig/java/test_framework (original)
+++ incubator/mesos/trunk/src/swig/java/test_framework Sun Jun 5 03:22:00 2011
@@ -1,2 +1,4 @@
#!/bin/sh
+FWDIR=`dirname $0`
+cd $FWDIR
exec java -cp .:nexus.jar -Djava.library.path=. TestFramework $@
Modified: incubator/mesos/trunk/src/swig/nexus.i
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/swig/nexus.i?rev=1131554&r1=1131553&r2=1131554&view=diff
==============================================================================
--- incubator/mesos/trunk/src/swig/nexus.i (original)
+++ incubator/mesos/trunk/src/swig/nexus.i Sun Jun 5 03:22:00 2011
@@ -76,7 +76,7 @@
return $jnicall;
}
- /* Typemap for NexusSchedulerDriver to keep a reference to the Scheduler */
+ /* Typemaps for NexusSchedulerDriver to keep a reference to the Scheduler */
%typemap(javain) nexus::Scheduler* "getCPtrAndAddReference($javainput)"
%typemap(javacode) nexus::NexusSchedulerDriver %{
@@ -99,6 +99,30 @@
delete();
}
%}
+
+ /* Typemaps for NexusExecutorDriver to keep a reference to the Executor */
+ %typemap(javain) nexus::Executor* "getCPtrAndAddReference($javainput)"
+
+ %typemap(javacode) nexus::NexusExecutorDriver %{
+ private static java.util.HashSet<Executor> executors =
+ new java.util.HashSet<Executor>();
+
+ private static long getCPtrAndAddReference(Executor executor) {
+ synchronized (executors) {
+ executors.add(executor);
+ }
+ return Executor.getCPtr(executor);
+ }
+ %}
+
+ %typemap(javafinalize) nexus::NexusExecutorDriver %{
+ protected void finalize() {
+ synchronized (executors) {
+ executors.remove(getExecutor());
+ }
+ delete();
+ }
+ %}
#endif /* SWIGJAVA */
#ifdef SWIGPYTHON
@@ -107,7 +131,13 @@
%feature("pythonappend") nexus::NexusSchedulerDriver::NexusSchedulerDriver %{
self.scheduler = args[0]
%}
-#endif
+
+ /* Add a reference to executor in the Python wrapper object to prevent it
+ from being garbage-collected while the NexusExecutorDriver exists */
+ %feature("pythonappend") nexus::NexusExecutorDriver::NexusExecutorDriver %{
+ self.executor = args[0]
+ %}
+#endif /* SWIGPYTHON */
#ifdef SWIGRUBY
/* Hide NexusSchedulerDriver::getScheduler because it would require
Modified: incubator/mesos/trunk/src/swig/python/test_exec.py
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/swig/python/test_exec.py?rev=1131554&r1=1131553&r2=1131554&view=diff
==============================================================================
--- incubator/mesos/trunk/src/swig/python/test_exec.py (original)
+++ incubator/mesos/trunk/src/swig/python/test_exec.py Sun Jun 5 03:22:00 2011
@@ -7,18 +7,17 @@ class MyExecutor(nexus.Executor):
def __init__(self):
nexus.Executor.__init__(self)
- def startTask(self, task):
+ def startTask(self, driver, task):
print "Running task %d" % task.taskId
time.sleep(1)
print "Sending the update..."
update = nexus.TaskStatus(task.taskId, nexus.TASK_FINISHED, "")
- self.sendStatusUpdate(update)
+ driver.sendStatusUpdate(update)
print "Sent the update"
- def error(self, code, message):
+ def error(self, driver, code, message):
print "Error: %s" % message
if __name__ == "__main__":
print "Starting executor"
- executor = MyExecutor()
- executor.run()
+ nexus.NexusExecutorDriver(MyExecutor()).run()