You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 07:38:02 UTC
svn commit: r1131827 - in /incubator/mesos/trunk/src: memhog.cpp
memhog_executor.cpp
Author: benh
Date: Sun Jun 5 05:38:02 2011
New Revision: 1131827
URL: http://svn.apache.org/viewvc?rev=1131827&view=rev
Log:
Making memhog-executor support per-task arguments so that it can be used
for more interesting frameworks.
Modified:
incubator/mesos/trunk/src/memhog.cpp
incubator/mesos/trunk/src/memhog_executor.cpp
Modified: incubator/mesos/trunk/src/memhog.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/memhog.cpp?rev=1131827&r1=1131826&r2=1131827&view=diff
==============================================================================
--- incubator/mesos/trunk/src/memhog.cpp (original)
+++ incubator/mesos/trunk/src/memhog.cpp Sun Jun 5 05:38:02 2011
@@ -40,10 +40,7 @@ public:
}
virtual ExecutorInfo getExecutorInfo(SchedulerDriver*) {
- ostringstream arg;
- arg << memToHog << " " << taskLen << " " << threadsPerTask;
- cout << "Executor arg: " << arg.str() << endl;
- return ExecutorInfo(executor, arg.str());
+ return ExecutorInfo(executor, "");
}
virtual void registered(SchedulerDriver*, FrameworkID fid) {
@@ -53,7 +50,6 @@ public:
virtual void resourceOffer(SchedulerDriver* d,
OfferID id,
const vector<SlaveOffer>& offers) {
- cout << "." << flush;
vector<TaskDescription> tasks;
foreach (const SlaveOffer &offer, offers) {
// This is kind of ugly because operator[] isn't a const function
@@ -61,12 +57,13 @@ public:
int64_t mem = lexical_cast<int64_t>(offer.params.find("mem")->second);
if ((tasksLaunched < totalTasks) && (cpus >= 1 && mem >= memToRequest)) {
TaskID tid = tasksLaunched++;
-
- cout << endl << "accepting it to start task " << tid << endl;
- map<string, string> taskParams;
- taskParams["cpus"] = "1";
- taskParams["mem"] = lexical_cast<string>(memToRequest);
- TaskDescription desc(tid, offer.slaveId, "task", taskParams, "");
+ cout << "Launcing task " << tid << " on " << offer.host << endl;
+ map<string, string> params;
+ params["cpus"] = "1";
+ params["mem"] = lexical_cast<string>(memToRequest);
+ ostringstream arg;
+ arg << memToHog << " " << taskLen << " " << threadsPerTask;
+ TaskDescription desc(tid, offer.slaveId, "task", params, arg.str());
tasks.push_back(desc);
}
}
@@ -76,11 +73,9 @@ public:
}
virtual void statusUpdate(SchedulerDriver* d, const TaskStatus& status) {
- cout << endl << "Task " << status.taskId << " is in state " << status.state << endl;
- if (status.state == TASK_LOST)
- {
- cout << endl << "Task " << status.taskId << " lost. Not doing anything about it." << endl;
- }
+ cout << "Task " << status.taskId << " is in state " << status.state << endl;
+ if (status.state == TASK_LOST)
+ cout << "Task " << status.taskId << " lost. Not doing anything about it." << endl;
if (status.state == TASK_FINISHED)
tasksFinished++;
if (tasksFinished == totalTasks)
Modified: incubator/mesos/trunk/src/memhog_executor.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/memhog_executor.cpp?rev=1131827&r1=1131826&r2=1131827&view=diff
==============================================================================
--- incubator/mesos/trunk/src/memhog_executor.cpp (original)
+++ incubator/mesos/trunk/src/memhog_executor.cpp Sun Jun 5 05:38:02 2011
@@ -2,59 +2,55 @@
#include <nexus_exec.hpp>
-#include <boost/lexical_cast.hpp>
-
#include <cstdlib>
#include <iostream>
#include <sstream>
-
using namespace std;
using namespace nexus;
-using boost::lexical_cast;
-
class MemHogExecutor;
struct ThreadArg
{
- MemHogExecutor *executor;
+ MemHogExecutor* executor;
TaskID taskId;
int threadId;
+ int64_t memToHog;
+ double duration;
- ThreadArg(MemHogExecutor *exec, TaskID task, int thread)
- : executor(exec), taskId(task), threadId(thread) {}
+ ThreadArg(MemHogExecutor* executor_, TaskID taskId_, int threadId_,
+ int64_t memToHog_, double duration_)
+ : executor(executor_), taskId(taskId_), threadId(threadId_),
+ memToHog(memToHog_), duration(duration_) {}
};
-void *runTask(void *arg);
+void* runTask(void* threadArg);
class MemHogExecutor : public Executor
{
public:
- double taskLen;
- int64_t memToHog;
- int threadsPerTask;
ExecutorDriver* driver;
virtual ~MemHogExecutor() {}
virtual void init(ExecutorDriver* driver, const ExecutorArgs &args) {
this->driver = driver;
- istringstream in(args.data);
- in >> memToHog >> taskLen >> threadsPerTask;
- cout << "Initialized: memToHog = " << memToHog
- << ", taskLen = " << taskLen
- << ", threadsPerTask = " << threadsPerTask << endl;
}
virtual void launchTask(ExecutorDriver*, const TaskDescription& task) {
cout << "Executor starting task " << task.taskId << endl;
- for (int i = 0; i < threadsPerTask; i++) {
- ThreadArg *arg = new ThreadArg(this, task.taskId, i);
+ int64_t memToHog;
+ double duration;
+ int numThreads;
+ istringstream in(task.arg);
+ in >> memToHog >> duration >> numThreads;
+ for (int i = 0; i < numThreads; i++) {
+ ThreadArg* arg = new ThreadArg(this, task.taskId, i, memToHog, duration);
pthread_t thread;
pthread_create(&thread, 0, runTask, arg);
pthread_detach(thread);
@@ -75,31 +71,28 @@ uint32_t nextRand(uint32_t x) {
// Function executed by each worker thread.
-void *runTask(void *arg)
+void* runTask(void* threadArg)
{
- ThreadArg *threadArg = (ThreadArg *) arg;
- MemHogExecutor *executor = threadArg->executor;
- int64_t memToHog = executor->memToHog;
- double taskLen = executor->taskLen;
+ ThreadArg* arg = (ThreadArg*) threadArg;
cout << "Running a worker thread..." << endl;
- char *data = new char[memToHog];
+ char* data = new char[arg->memToHog];
int32_t count = 0;
time_t start = time(0);
- uint32_t pos = threadArg->threadId;
+ uint32_t pos = arg->threadId;
while (true) {
pos = nextRand(pos);
- data[pos % memToHog] = pos;
+ data[pos % arg->memToHog] = pos;
count++;
if (count == 5000) {
// Check whether enough time has elapsed to end the task
count = 0;
time_t now = time(0);
- if (difftime(now, start) > taskLen) {
+ if (difftime(now, start) > arg->duration) {
delete[] data;
- if (threadArg->threadId == 0) {
+ if (arg->threadId == 0) {
usleep(100000); // sleep 0.1 seconds for other threads to finish
- TaskStatus status(threadArg->taskId, TASK_FINISHED, "");
- executor->driver->sendStatusUpdate(status);
+ TaskStatus status(arg->taskId, TASK_FINISHED, "");
+ arg->executor->driver->sendStatusUpdate(status);
}
return 0;
}
@@ -108,7 +101,7 @@ void *runTask(void *arg)
}
-int main(int argc, char ** argv) {
+int main(int argc, char** argv) {
MemHogExecutor exec;
NexusExecutorDriver driver(&exec);
driver.run();