You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 07:38:02 UTC

svn commit: r1131827 - in /incubator/mesos/trunk/src: memhog.cpp memhog_executor.cpp

Author: benh
Date: Sun Jun  5 05:38:02 2011
New Revision: 1131827

URL: http://svn.apache.org/viewvc?rev=1131827&view=rev
Log:
Making memhog-executor support per-task arguments so that it can be used
for more interesting frameworks.

Modified:
    incubator/mesos/trunk/src/memhog.cpp
    incubator/mesos/trunk/src/memhog_executor.cpp

Modified: incubator/mesos/trunk/src/memhog.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/memhog.cpp?rev=1131827&r1=1131826&r2=1131827&view=diff
==============================================================================
--- incubator/mesos/trunk/src/memhog.cpp (original)
+++ incubator/mesos/trunk/src/memhog.cpp Sun Jun  5 05:38:02 2011
@@ -40,10 +40,7 @@ public:
   }
 
   virtual ExecutorInfo getExecutorInfo(SchedulerDriver*) {
-    ostringstream arg;
-    arg << memToHog << " " << taskLen << " " << threadsPerTask;
-    cout << "Executor arg: " << arg.str() << endl;
-    return ExecutorInfo(executor, arg.str());
+    return ExecutorInfo(executor, "");
   }
 
   virtual void registered(SchedulerDriver*, FrameworkID fid) {
@@ -53,7 +50,6 @@ public:
   virtual void resourceOffer(SchedulerDriver* d,
                              OfferID id,
                              const vector<SlaveOffer>& offers) {
-    cout << "." << flush;
     vector<TaskDescription> tasks;
     foreach (const SlaveOffer &offer, offers) {
       // This is kind of ugly because operator[] isn't a const function
@@ -61,12 +57,13 @@ public:
       int64_t mem = lexical_cast<int64_t>(offer.params.find("mem")->second);
       if ((tasksLaunched < totalTasks) && (cpus >= 1 && mem >= memToRequest)) {
         TaskID tid = tasksLaunched++;
-
-        cout << endl << "accepting it to start task " << tid << endl;
-        map<string, string> taskParams;
-        taskParams["cpus"] = "1";
-        taskParams["mem"] = lexical_cast<string>(memToRequest);
-        TaskDescription desc(tid, offer.slaveId, "task", taskParams, "");
+        cout << "Launcing task " << tid << " on " << offer.host << endl;
+        map<string, string> params;
+        params["cpus"] = "1";
+        params["mem"] = lexical_cast<string>(memToRequest);
+        ostringstream arg;
+        arg << memToHog << " " << taskLen << " " << threadsPerTask;
+        TaskDescription desc(tid, offer.slaveId, "task", params, arg.str());
         tasks.push_back(desc);
       }
     }
@@ -76,11 +73,9 @@ public:
   }
 
   virtual void statusUpdate(SchedulerDriver* d, const TaskStatus& status) {
-    cout << endl << "Task " << status.taskId << " is in state " << status.state << endl;
-    if (status.state == TASK_LOST) 
-       {
-	  cout << endl << "Task " << status.taskId << " lost. Not doing anything about it." << endl;
-       }
+    cout << "Task " << status.taskId << " is in state " << status.state << endl;
+    if (status.state == TASK_LOST)
+      cout << "Task " << status.taskId << " lost. Not doing anything about it." << endl;
     if (status.state == TASK_FINISHED)
       tasksFinished++;
     if (tasksFinished == totalTasks)

Modified: incubator/mesos/trunk/src/memhog_executor.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/memhog_executor.cpp?rev=1131827&r1=1131826&r2=1131827&view=diff
==============================================================================
--- incubator/mesos/trunk/src/memhog_executor.cpp (original)
+++ incubator/mesos/trunk/src/memhog_executor.cpp Sun Jun  5 05:38:02 2011
@@ -2,59 +2,55 @@
 
 #include <nexus_exec.hpp>
 
-#include <boost/lexical_cast.hpp>
-
 #include <cstdlib>
 #include <iostream>
 #include <sstream>
 
-
 using namespace std;
 using namespace nexus;
 
-using boost::lexical_cast;
-
 
 class MemHogExecutor;
 
 
 struct ThreadArg
 {
-  MemHogExecutor *executor;
+  MemHogExecutor* executor;
   TaskID taskId;
   int threadId;
+  int64_t memToHog;
+  double duration;
 
-  ThreadArg(MemHogExecutor *exec, TaskID task, int thread)
-    : executor(exec), taskId(task), threadId(thread) {}
+  ThreadArg(MemHogExecutor* executor_, TaskID taskId_, int threadId_,
+            int64_t memToHog_, double duration_)
+    : executor(executor_), taskId(taskId_), threadId(threadId_),
+      memToHog(memToHog_), duration(duration_) {}
 };
 
 
-void *runTask(void *arg);
+void* runTask(void* threadArg);
 
 
 class MemHogExecutor : public Executor
 {
 public:
-  double taskLen;
-  int64_t memToHog;
-  int threadsPerTask;
   ExecutorDriver* driver;
 
   virtual ~MemHogExecutor() {}
 
   virtual void init(ExecutorDriver* driver, const ExecutorArgs &args) {
     this->driver = driver;
-    istringstream in(args.data);
-    in >> memToHog >> taskLen >> threadsPerTask;
-    cout << "Initialized: memToHog = " << memToHog
-         << ", taskLen = " << taskLen
-         << ", threadsPerTask = " << threadsPerTask << endl;
   }
 
   virtual void launchTask(ExecutorDriver*, const TaskDescription& task) {
     cout << "Executor starting task " << task.taskId << endl;
-    for (int i = 0; i < threadsPerTask; i++) {
-      ThreadArg *arg = new ThreadArg(this, task.taskId, i);
+    int64_t memToHog;
+    double duration;
+    int numThreads;
+    istringstream in(task.arg);
+    in >> memToHog >> duration >> numThreads;
+    for (int i = 0; i < numThreads; i++) {
+      ThreadArg* arg = new ThreadArg(this, task.taskId, i, memToHog, duration);
       pthread_t thread;
       pthread_create(&thread, 0, runTask, arg);
       pthread_detach(thread);
@@ -75,31 +71,28 @@ uint32_t nextRand(uint32_t x) {
 
 
 // Function executed by each worker thread.
-void *runTask(void *arg)
+void* runTask(void* threadArg)
 {
-  ThreadArg *threadArg = (ThreadArg *) arg;
-  MemHogExecutor *executor = threadArg->executor;
-  int64_t memToHog = executor->memToHog;
-  double taskLen = executor->taskLen;
+  ThreadArg* arg = (ThreadArg*) threadArg;
   cout << "Running a worker thread..." << endl;
-  char *data = new char[memToHog];
+  char* data = new char[arg->memToHog];
   int32_t count = 0;
   time_t start = time(0);
-  uint32_t pos = threadArg->threadId;
+  uint32_t pos = arg->threadId;
   while (true) {
     pos = nextRand(pos);
-    data[pos % memToHog] = pos;
+    data[pos % arg->memToHog] = pos;
     count++;
     if (count == 5000) {
       // Check whether enough time has elapsed to end the task
       count = 0;
       time_t now = time(0);
-      if (difftime(now, start) > taskLen) {
+      if (difftime(now, start) > arg->duration) {
         delete[] data;
-        if (threadArg->threadId == 0) {
+        if (arg->threadId == 0) {
           usleep(100000); // sleep 0.1 seconds for other threads to finish
-          TaskStatus status(threadArg->taskId, TASK_FINISHED, "");
-          executor->driver->sendStatusUpdate(status);
+          TaskStatus status(arg->taskId, TASK_FINISHED, "");
+          arg->executor->driver->sendStatusUpdate(status);
         }
         return 0;
       }
@@ -108,7 +101,7 @@ void *runTask(void *arg)
 }
 
 
-int main(int argc, char ** argv) {
+int main(int argc, char** argv) {
   MemHogExecutor exec;
   NexusExecutorDriver driver(&exec);
   driver.run();