You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 05:22:01 UTC

svn commit: r1131554 - in /incubator/mesos/trunk: frameworks/hadoop-0.20.0/src/mapred/org/apache/hadoop/mapred/ frameworks/haproxy+apache/ frameworks/mpi/ include/ src/ src/swig/ src/swig/java/ src/swig/python/

Author: benh
Date: Sun Jun  5 03:22:00 2011
New Revision: 1131554

URL: http://svn.apache.org/viewvc?rev=1131554&view=rev
Log:
Updated frameworks to use driver API. Also implemented proper
referencing of executor in Java and Python.

Modified:
    incubator/mesos/trunk/frameworks/hadoop-0.20.0/src/mapred/org/apache/hadoop/mapred/NexusExecutor.java
    incubator/mesos/trunk/frameworks/hadoop-0.20.0/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
    incubator/mesos/trunk/frameworks/haproxy+apache/startapache.py
    incubator/mesos/trunk/frameworks/mpi/startmpd.py
    incubator/mesos/trunk/include/nexus_exec.hpp
    incubator/mesos/trunk/include/nexus_sched.hpp
    incubator/mesos/trunk/src/cpp_test_executor.cpp
    incubator/mesos/trunk/src/memhog_executor.cpp
    incubator/mesos/trunk/src/swig/java/TestExecutor.java
    incubator/mesos/trunk/src/swig/java/TestFramework.java
    incubator/mesos/trunk/src/swig/java/test_framework
    incubator/mesos/trunk/src/swig/nexus.i
    incubator/mesos/trunk/src/swig/python/test_exec.py

Modified: incubator/mesos/trunk/frameworks/hadoop-0.20.0/src/mapred/org/apache/hadoop/mapred/NexusExecutor.java
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/frameworks/hadoop-0.20.0/src/mapred/org/apache/hadoop/mapred/NexusExecutor.java?rev=1131554&r1=1131553&r2=1131554&view=diff
==============================================================================
--- incubator/mesos/trunk/frameworks/hadoop-0.20.0/src/mapred/org/apache/hadoop/mapred/NexusExecutor.java (original)
+++ incubator/mesos/trunk/frameworks/hadoop-0.20.0/src/mapred/org/apache/hadoop/mapred/NexusExecutor.java Sun Jun  5 03:22:00 2011
@@ -21,7 +21,9 @@ import org.apache.hadoop.mapred.TaskStat
 
 import nexus.Executor;
 import nexus.ExecutorArgs;
+import nexus.ExecutorDriver;
 import nexus.FrameworkMessage;
+import nexus.NexusExecutorDriver;
 import nexus.TaskDescription;
 import nexus.TaskState;
 
@@ -33,6 +35,7 @@ public class NexusExecutor extends Execu
   private JobConf conf;
   private TaskTracker taskTracker;
 
+  private ExecutorDriver driver;
   private int slaveId;
   
   private AtomicInteger nextRpcId = new AtomicInteger();
@@ -69,8 +72,9 @@ public class NexusExecutor extends Execu
   }
   
   @Override
-  public void init(ExecutorArgs args) {
+  public void init(ExecutorDriver driver, ExecutorArgs args) {
     try {
+      this.driver = driver;
       slaveId = args.getSlaveId();
       conf = new JobConf();
       conf.set("mapred.job.tracker", new String(args.getData()));
@@ -86,7 +90,7 @@ public class NexusExecutor extends Execu
       System.exit(1);
     }
   }
-  
+
   /**
    * Remove expired unmapped tasks. We call this right before sending out every
    * heartbeat to ensure that we never kill an unmapped task that we'll
@@ -123,12 +127,12 @@ public class NexusExecutor extends Execu
       }
     }
     for (nexus.TaskStatus update: updates) {
-      sendStatusUpdate(update);
+      driver.sendStatusUpdate(update);
     }
   }
 
   @Override
-  public void killTask(int taskId) {
+  public void killTask(ExecutorDriver d, int taskId) {
     synchronized (this) {
       TaskAttemptID hadoopId = nexusIdToHadoopId.get(taskId);
       taskTracker.killTask(hadoopId);
@@ -136,7 +140,7 @@ public class NexusExecutor extends Execu
   }
 
   @Override
-  public void startTask(TaskDescription taskDesc) {
+  public void startTask(ExecutorDriver d, TaskDescription taskDesc) {
     String taskType = new String(taskDesc.getArg());
     LOG.info("start_task " + taskDesc.getTaskId() + ": " + taskType);
     if (taskType.equals("map")) {
@@ -155,7 +159,7 @@ public class NexusExecutor extends Execu
   }
   
   @Override
-  public void frameworkMessage(FrameworkMessage message) {
+  public void frameworkMessage(ExecutorDriver d, FrameworkMessage message) {
     try {
       //LOG.info("Got RPC response of size " + message.getData().length);
       ObjectWritable writable = new ObjectWritable();
@@ -227,7 +231,7 @@ public class NexusExecutor extends Execu
     byte[] bytes = bos.toByteArray();
     //LOG.info("RPC message length: " + bytes.length);
     FrameworkMessage msg = new FrameworkMessage(0, 0, bytes);
-    sendFrameworkMessage(msg);
+    driver.sendFrameworkMessage(msg);
     
     // Wait for a reply
     synchronized(response) {
@@ -305,7 +309,7 @@ public class NexusExecutor extends Execu
       }
       for (nexus.TaskStatus update: updates) {
         if (update != null) {
-          sendStatusUpdate(update);
+          driver.sendStatusUpdate(update);
         }
       }
       
@@ -387,7 +391,8 @@ public class NexusExecutor extends Execu
     }
   }
 
-  public synchronized nexus.TaskStatus createTaskDoneUpdate(TaskStatus taskStatus) {
+  public synchronized nexus.TaskStatus createTaskDoneUpdate(
+      TaskStatus taskStatus) {
     TaskAttemptID hadoopId = taskStatus.getTaskID();
     if (hadoopIdToNexusId.containsKey(hadoopId)) {
       Integer nexusId = hadoopIdToNexusId.get(hadoopId);
@@ -423,9 +428,13 @@ public class NexusExecutor extends Execu
     }
     return null;
   }
+  
+  public ExecutorDriver getDriver() {
+    return driver;
+  }
 
   public static void main(String[] args) throws Exception {
     System.loadLibrary("nexus");
-    new NexusExecutor().run();
+    new NexusExecutorDriver(new NexusExecutor()).run();
   }
 }

Modified: incubator/mesos/trunk/frameworks/hadoop-0.20.0/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/frameworks/hadoop-0.20.0/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1131554&r1=1131553&r2=1131554&view=diff
==============================================================================
--- incubator/mesos/trunk/frameworks/hadoop-0.20.0/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ incubator/mesos/trunk/frameworks/hadoop-0.20.0/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Sun Jun  5 03:22:00 2011
@@ -2096,7 +2096,7 @@ public class TaskTracker 
         // Report the task as done to Nexus so that it can report our slot
         // as finished and get a new slot assigned before the next heartbeat
         nexus.TaskStatus s = nexusExecutor.createTaskDoneUpdate(taskStatus);
-        if (s != null) nexusExecutor.sendStatusUpdate(s);
+        if (s != null) nexusExecutor.getDriver().sendStatusUpdate(s);
       }
     }
 
@@ -2157,7 +2157,7 @@ public class TaskTracker 
         // Report the task as done to Nexus so that it can report our slot
         // as finished and get a new slot assigned before the next heartbeat
         nexus.TaskStatus s = nexusExecutor.createTaskDoneUpdate(taskStatus);
-        if (s != null) nexusExecutor.sendStatusUpdate(s);
+        if (s != null) nexusExecutor.getDriver().sendStatusUpdate(s);
       }
     }
     

Modified: incubator/mesos/trunk/frameworks/haproxy+apache/startapache.py
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/frameworks/haproxy%2Bapache/startapache.py?rev=1131554&r1=1131553&r2=1131554&view=diff
==============================================================================
--- incubator/mesos/trunk/frameworks/haproxy+apache/startapache.py (original)
+++ incubator/mesos/trunk/frameworks/haproxy+apache/startapache.py Sun Jun  5 03:22:00 2011
@@ -20,18 +20,18 @@ class MyExecutor(nexus.Executor):
     nexus.Executor.__init__(self)
     self.tid = -1
 
-  def startTask(self, task):
+  def startTask(self, driver, task):
     self.tid = task.taskId
     Popen("/usr/apache2/2.2/bin/apachectl start", shell=True)
 
-  def killTask(self, tid):
+  def killTask(self, driver, tid):
     if (tid != self.tid):
       print "Expecting different task id ... killing anyway!"
     cleanup()
     update = nexus.TaskStatus(tid, nexus.TASK_FINISHED, "")
-    self.sendStatusUpdate(update)
+    driver.sendStatusUpdate(update)
 
-  def shutdown(self):
+  def shutdown(driver, self):
     cleanup()
 
   def error(self, code, message):
@@ -41,4 +41,4 @@ if __name__ == "__main__":
   print "Starting haproxy+apache executor"
   atexit.register(cleanup)
   executor = MyExecutor()
-  executor.run()
+  nexus.NexusExecutorDriver(executor).run()

Modified: incubator/mesos/trunk/frameworks/mpi/startmpd.py
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/frameworks/mpi/startmpd.py?rev=1131554&r1=1131553&r2=1131554&view=diff
==============================================================================
--- incubator/mesos/trunk/frameworks/mpi/startmpd.py (original)
+++ incubator/mesos/trunk/frameworks/mpi/startmpd.py Sun Jun  5 03:22:00 2011
@@ -20,28 +20,28 @@ class MyExecutor(nexus.Executor):
   def __init__(self):
     nexus.Executor.__init__(self)
 
-  def init(self, arg):
+  def init(self, driver, arg):
     [ip,port] = arg.data.split(":")
     self.ip = ip
     self.port = port
 
-  def startTask(self, task):
+  def startTask(self, driver, task):
     print "Running task %d" % task.taskId
     Popen("mpd -n -h "+self.ip+" -p "+self.port, shell=True)
 
-  def killTask(self, tid):
+  def killTask(self, driver, tid):
     # TODO(*): Kill only one of the mpd's!
     sys.exit(1)
 
-  def shutdown(self):
+  def shutdown(self, driver):
     print "shutdown"
     cleanup()
 
-  def error(self, code, message):
+  def error(self, driver, code, message):
     print "Error: %s" % message
 
 if __name__ == "__main__":
   print "Starting executor"
   atexit.register(cleanup)
   executor = MyExecutor()
-  executor.run()
+  nexus.NexusExecutorDriver(executor).run()

Modified: incubator/mesos/trunk/include/nexus_exec.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/include/nexus_exec.hpp?rev=1131554&r1=1131553&r2=1131554&view=diff
==============================================================================
--- incubator/mesos/trunk/include/nexus_exec.hpp (original)
+++ incubator/mesos/trunk/include/nexus_exec.hpp Sun Jun  5 03:22:00 2011
@@ -88,6 +88,9 @@ public:
   virtual void sendStatusUpdate(const TaskStatus& status);
   virtual void sendFrameworkMessage(const FrameworkMessage& message);
 
+  // Executor getter; required by some of the SWIG proxies
+  virtual Executor* getExecutor() { return executor; }
+
 private:
   friend class internal::ExecutorProcess;
 

Modified: incubator/mesos/trunk/include/nexus_sched.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/include/nexus_sched.hpp?rev=1131554&r1=1131553&r2=1131554&view=diff
==============================================================================
--- incubator/mesos/trunk/include/nexus_sched.hpp (original)
+++ incubator/mesos/trunk/include/nexus_sched.hpp Sun Jun  5 03:22:00 2011
@@ -94,11 +94,12 @@ public:
                             const string_map& params);
   virtual void reviveOffers();
   virtual void sendHints(const string_map& hints);
-  
-  // Scheduler getter; mostly used in SWIG proxies
+
+  // Scheduler getter; required by some of the SWIG proxies
   virtual Scheduler* getScheduler() { return sched; }
 
 private:
+  // Internal utility method to report an error to the scheduler
   void error(int code, const std::string& message);
 
   std::string master;

Modified: incubator/mesos/trunk/src/cpp_test_executor.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/cpp_test_executor.cpp?rev=1131554&r1=1131553&r2=1131554&view=diff
==============================================================================
--- incubator/mesos/trunk/src/cpp_test_executor.cpp (original)
+++ incubator/mesos/trunk/src/cpp_test_executor.cpp Sun Jun  5 03:22:00 2011
@@ -12,22 +12,23 @@ class MyExecutor : public Executor
 public:
   virtual ~MyExecutor() {}
 
-  virtual void init(const ExecutorArgs& args) {
+  virtual void init(ExecutorDriver*, const ExecutorArgs& args) {
     cout << "Init" << endl;
   }
 
-  virtual void startTask(const TaskDescription& task) {
+  virtual void startTask(ExecutorDriver* d, const TaskDescription& task) {
     cout << "Starting task " << task.taskId << endl;
     sleep(1);
     cout << "Finishing task " << task.taskId << endl;
     TaskStatus status(task.taskId, TASK_FINISHED, "");
-    sendStatusUpdate(status);
+    d->sendStatusUpdate(status);
   }
 };
 
 
-int main(int argc, char ** argv) {
+int main(int argc, char** argv) {
   MyExecutor exec;
-  exec.run();
+  NexusExecutorDriver driver(&exec);
+  driver.run();
   return 0;
 }

Modified: incubator/mesos/trunk/src/memhog_executor.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/memhog_executor.cpp?rev=1131554&r1=1131553&r2=1131554&view=diff
==============================================================================
--- incubator/mesos/trunk/src/memhog_executor.cpp (original)
+++ incubator/mesos/trunk/src/memhog_executor.cpp Sun Jun  5 03:22:00 2011
@@ -38,9 +38,12 @@ public:
   double taskLen;
   int64_t memToHog;
   int threadsPerTask;
+  ExecutorDriver* driver;
+
   virtual ~MemHogExecutor() {}
 
-  virtual void init(const ExecutorArgs &args) {
+  virtual void init(ExecutorDriver* driver, const ExecutorArgs &args) {
+    this->driver = driver;
     istringstream in(args.data);
     in >> memToHog >> taskLen >> threadsPerTask;
     cout << "Initialized: memToHog = " << memToHog
@@ -48,7 +51,7 @@ public:
          << ", threadsPerTask = " << threadsPerTask << endl;
   }
 
-  virtual void startTask(const TaskDescription& task) {
+  virtual void startTask(ExecutorDriver*, const TaskDescription& task) {
     cout << "Executor starting task " << task.taskId << endl;
     for (int i = 0; i < threadsPerTask; i++) {
       ThreadArg *arg = new ThreadArg(this, task.taskId, i == 0);
@@ -66,7 +69,7 @@ void *runTask(void *arg)
   MemHogExecutor *executor = threadArg->executor;
   int64_t memToHog = executor->memToHog;
   double taskLen = executor->taskLen;
-  cout << "Running a task..." << endl;
+  cout << "Running a worker thread..." << endl;
   char *data = new char[memToHog];
   int32_t count = 0;
   time_t start = time(0);
@@ -82,7 +85,7 @@ void *runTask(void *arg)
           if (threadArg->primary) {
             usleep(100000); // sleep 0.1 seconds
             TaskStatus status(threadArg->tid, TASK_FINISHED, "");
-            executor->sendStatusUpdate(status);
+            executor->driver->sendStatusUpdate(status);
           }
           return 0;
         }
@@ -94,6 +97,7 @@ void *runTask(void *arg)
 
 int main(int argc, char ** argv) {
   MemHogExecutor exec;
-  exec.run();
+  NexusExecutorDriver driver(&exec);
+  driver.run();
   return 0;
 }

Modified: incubator/mesos/trunk/src/swig/java/TestExecutor.java
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/swig/java/TestExecutor.java?rev=1131554&r1=1131553&r2=1131554&view=diff
==============================================================================
--- incubator/mesos/trunk/src/swig/java/TestExecutor.java (original)
+++ incubator/mesos/trunk/src/swig/java/TestExecutor.java Sun Jun  5 03:22:00 2011
@@ -7,18 +7,23 @@ public class TestExecutor extends Execut
   }
 
   @Override
-  public void startTask(final TaskDescription task) {
+  public void startTask(final ExecutorDriver d, final TaskDescription task) {
     new Thread() { public void run() {
+      try {
         System.out.println("Running task " + task.getTaskId());
-        try { Thread.sleep(1000); } catch(Exception e) {}
-        sendStatusUpdate(new TaskStatus(task.getTaskId(),
-                                        TaskState.TASK_FINISHED,
-                                        new byte[0]));
+        Thread.sleep(1000);
+        d.sendStatusUpdate(new TaskStatus(task.getTaskId(),
+                                          TaskState.TASK_FINISHED,
+                                          new byte[0]));
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
     }}.start();
   }
 
   public static void main(String[] args) throws Exception {
     TestExecutor exec = new TestExecutor();
-    exec.run();
+    ExecutorDriver driver = new NexusExecutorDriver(exec);
+    driver.run();
   }
 }

Modified: incubator/mesos/trunk/src/swig/java/TestFramework.java
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/swig/java/TestFramework.java?rev=1131554&r1=1131553&r2=1131554&view=diff
==============================================================================
--- incubator/mesos/trunk/src/swig/java/TestFramework.java (original)
+++ incubator/mesos/trunk/src/swig/java/TestFramework.java Sun Jun  5 03:22:00 2011
@@ -30,13 +30,13 @@ public class TestFramework {
     }
 
     @Override
-    public void registered(SchedulerDriver d, String fid) {
+    public void registered(SchedulerDriver d, int fid) {
       System.out.println("Registered! FID = " + fid);
     }
 
     @Override
     public void resourceOffer(SchedulerDriver d,
-                              String oid,
+                              long oid,
                               SlaveOfferVector offers) {
       System.out.println("Got offer offer " + oid);
       TaskDescriptionVector tasks = new TaskDescriptionVector();
@@ -47,12 +47,11 @@ public class TestFramework {
         taskParams.set("cpus", "1");
         taskParams.set("mem", "134217728");
         System.out.println("Launching task " + taskId);
-        tasks.add(new TaskDescription(launchedTasks,
+        tasks.add(new TaskDescription(taskId,
                                       offer.getSlaveId(),
                                       "task " + taskId,
                                       taskParams,
                                       new byte[0]));
-	launchedTasks++;
       }
       StringMap params = new StringMap();
       params.set("timeout", "1");
@@ -60,23 +59,28 @@ public class TestFramework {
     }
 
     @Override
+    public void statusUpdate(SchedulerDriver d, TaskStatus status) {
+      System.out.println("Status update: task " + status.getTaskId() +
+                         " is in state " + status.getState());
+      if (status.getState() == TaskState.TASK_FINISHED) {
+        finishedTasks++;
+        System.out.println("Finished tasks: " + finishedTasks);
+        if (finishedTasks == totalTasks)
+          d.stop();
+      }
+    }
+
+    @Override
     public void error(SchedulerDriver d, int code, String message) {
       System.out.println("Error: " + message);
     }
   }
 
   public static void main(String[] args) throws Exception {
-    //MyScheduler sched = new MyScheduler();
-    //NexusSchedulerDriver driver = new NexusSchedulerDriver(sched, args[0]);
-    //driver.run();
     new NexusSchedulerDriver(new MyScheduler(), args[0]).run();
-    new Thread() {
-      public void run() {
-        while(true) {
-          System.gc();
-          try {Thread.sleep(1000);} catch(Exception e) {}
-        }
-      }
-    }.start();
+    // TODO: Java should just exit here, and it does so on Linux, but
+    // it doesn't on OS X. We should figure out why. It may have to do
+    // with the libprocess threads being around and not being marked
+    // as daemon threads by the JVM.
   }
 }

Modified: incubator/mesos/trunk/src/swig/java/test_framework
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/swig/java/test_framework?rev=1131554&r1=1131553&r2=1131554&view=diff
==============================================================================
--- incubator/mesos/trunk/src/swig/java/test_framework (original)
+++ incubator/mesos/trunk/src/swig/java/test_framework Sun Jun  5 03:22:00 2011
@@ -1,2 +1,4 @@
 #!/bin/sh
+FWDIR=`dirname $0`
+cd $FWDIR
 exec java -cp .:nexus.jar -Djava.library.path=. TestFramework $@

Modified: incubator/mesos/trunk/src/swig/nexus.i
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/swig/nexus.i?rev=1131554&r1=1131553&r2=1131554&view=diff
==============================================================================
--- incubator/mesos/trunk/src/swig/nexus.i (original)
+++ incubator/mesos/trunk/src/swig/nexus.i Sun Jun  5 03:22:00 2011
@@ -76,7 +76,7 @@
     return $jnicall; 
   } 
 
-  /* Typemap for NexusSchedulerDriver to keep a reference to the Scheduler */
+  /* Typemaps for NexusSchedulerDriver to keep a reference to the Scheduler */
   %typemap(javain) nexus::Scheduler* "getCPtrAndAddReference($javainput)"
 
   %typemap(javacode) nexus::NexusSchedulerDriver %{
@@ -99,6 +99,30 @@
       delete();
     }
   %}
+
+  /* Typemaps for NexusExecutorDriver to keep a reference to the Executor */
+  %typemap(javain) nexus::Executor* "getCPtrAndAddReference($javainput)"
+
+  %typemap(javacode) nexus::NexusExecutorDriver %{
+    private static java.util.HashSet<Executor> executors =
+      new java.util.HashSet<Executor>();
+
+    private static long getCPtrAndAddReference(Executor executor) {
+      synchronized (executors) {
+        executors.add(executor);
+      }
+      return Executor.getCPtr(executor);
+    }
+  %}
+
+  %typemap(javafinalize) nexus::NexusExecutorDriver %{
+    protected void finalize() {
+      synchronized (executors) {
+        executors.remove(getExecutor());
+      }
+      delete();
+    }
+  %}
 #endif /* SWIGJAVA */
 
 #ifdef SWIGPYTHON
@@ -107,7 +131,13 @@
   %feature("pythonappend") nexus::NexusSchedulerDriver::NexusSchedulerDriver %{
         self.scheduler = args[0]
   %}
-#endif
+
+  /* Add a reference to executor in the Python wrapper object to prevent it
+     from being garbage-collected while the NexusExecutorDriver exists */
+  %feature("pythonappend") nexus::NexusExecutorDriver::NexusExecutorDriver %{
+        self.executor = args[0]
+  %}
+#endif /* SWIGPYTHON */
 
 #ifdef SWIGRUBY
   /* Hide NexusSchedulerDriver::getScheduler because it would require

Modified: incubator/mesos/trunk/src/swig/python/test_exec.py
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/swig/python/test_exec.py?rev=1131554&r1=1131553&r2=1131554&view=diff
==============================================================================
--- incubator/mesos/trunk/src/swig/python/test_exec.py (original)
+++ incubator/mesos/trunk/src/swig/python/test_exec.py Sun Jun  5 03:22:00 2011
@@ -7,18 +7,17 @@ class MyExecutor(nexus.Executor):
   def __init__(self):
     nexus.Executor.__init__(self)
 
-  def startTask(self, task):
+  def startTask(self, driver, task):
     print "Running task %d" % task.taskId
     time.sleep(1)
     print "Sending the update..."
     update = nexus.TaskStatus(task.taskId, nexus.TASK_FINISHED, "")
-    self.sendStatusUpdate(update)
+    driver.sendStatusUpdate(update)
     print "Sent the update"
 
-  def error(self, code, message):
+  def error(self, driver, code, message):
     print "Error: %s" % message
 
 if __name__ == "__main__":
   print "Starting executor"
-  executor = MyExecutor()
-  executor.run()
+  nexus.NexusExecutorDriver(MyExecutor()).run()