You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2013/11/18 20:58:00 UTC

[2/2] git commit: Fixed MESOS-243: Made ExecutorDriver.join() wait for outstanding requests.

Fixed MESOS-243: Made ExecutorDriver.join() wait for outstanding requests.

Review: https://reviews.apache.org/r/15597


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/dd89ea35
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/dd89ea35
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/dd89ea35

Branch: refs/heads/master
Commit: dd89ea359ec55fbc90b5718d9cdbf021f189c2fa
Parents: ce43137
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Fri Nov 15 14:02:58 2013 -0800
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Mon Nov 18 11:27:10 2013 -0800

----------------------------------------------------------------------
 src/exec/exec.cpp         | 36 ++++++++++++++++++++++--------------
 src/launcher/executor.cpp |  4 ----
 2 files changed, 22 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/dd89ea35/src/exec/exec.cpp
----------------------------------------------------------------------
diff --git a/src/exec/exec.cpp b/src/exec/exec.cpp
index c866838..a58203b 100644
--- a/src/exec/exec.cpp
+++ b/src/exec/exec.cpp
@@ -107,7 +107,9 @@ public:
                   bool _local,
                   const string& _directory,
                   bool _checkpoint,
-                  Duration _recoveryTimeout)
+                  Duration _recoveryTimeout,
+                  pthread_mutex_t* _mutex,
+                  pthread_cond_t* _cond)
     : ProcessBase(ID::generate("executor")),
       slave(_slave),
       driver(_driver),
@@ -119,6 +121,8 @@ public:
       connection(UUID::random()),
       local(_local),
       aborted(false),
+      mutex(_mutex),
+      cond(_cond),
       directory(_directory),
       checkpoint(_checkpoint),
       recoveryTimeout(_recoveryTimeout)
@@ -392,10 +396,21 @@ protected:
     }
   }
 
+  void stop()
+  {
+    terminate(self());
+
+    Lock lock(mutex);
+    pthread_cond_signal(cond);
+  }
+
   void abort()
   {
     VLOG(1) << "De-activating the executor libprocess";
     aborted = true;
+
+    Lock lock(mutex);
+    pthread_cond_signal(cond);
   }
 
   void _recoveryTimeout(UUID _connection)
@@ -534,6 +549,8 @@ private:
   UUID connection; // UUID to identify the connection instance.
   bool local;
   bool aborted;
+  pthread_mutex_t* mutex;
+  pthread_cond_t* cond;
   const string directory;
   bool checkpoint;
   Duration recoveryTimeout;
@@ -685,7 +702,9 @@ Status MesosExecutorDriver::start()
       local,
       workDirectory,
       checkpoint,
-      recoveryTimeout);
+      recoveryTimeout,
+      &mutex,
+      &cond);
 
   spawn(process);
 
@@ -703,13 +722,7 @@ Status MesosExecutorDriver::stop()
 
   CHECK(process != NULL);
 
-  terminate(process);
-
-  // TODO(benh): Set the condition variable in ExecutorProcess just as
-  // we do with the MesosSchedulerDriver and SchedulerProcess:
-  // dispatch(process, &ExecutorProcess::stop);
-
-  pthread_cond_signal(&cond);
+  dispatch(process, &ExecutorProcess::stop);
 
   bool aborted = status == DRIVER_ABORTED;
 
@@ -729,13 +742,8 @@ Status MesosExecutorDriver::abort()
 
   CHECK(process != NULL);
 
-  // TODO(benh): Set the condition variable in ExecutorProcess just as
-  // we do with the MesosSchedulerDriver and SchedulerProcess.
-
   dispatch(process, &ExecutorProcess::abort);
 
-  pthread_cond_signal(&cond);
-
   return status = DRIVER_ABORTED;
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/dd89ea35/src/launcher/executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/executor.cpp b/src/launcher/executor.cpp
index b73ab47..2e53bb8 100644
--- a/src/launcher/executor.cpp
+++ b/src/launcher/executor.cpp
@@ -284,10 +284,6 @@ private:
     taskStatus.set_message(message);
 
     driver->sendStatusUpdate(taskStatus);
-
-    // A hack for now ... but we need to wait until the status update
-    // is sent to the slave before we shut ourselves down.
-    os::sleep(Seconds(1));
     driver->stop();
   }