You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2015/06/14 11:46:37 UTC

[19/21] mesos git commit: Update Mesos executor to use synchronized.

Update Mesos executor to use synchronized.

Review: https://reviews.apache.org/r/35097


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8939609d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8939609d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8939609d

Branch: refs/heads/master
Commit: 8939609d403aa1043d637cc03647c4ee40478b20
Parents: b2d8047
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Jun 13 07:12:31 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Jun 14 02:43:01 2015 -0700

----------------------------------------------------------------------
 src/exec/exec.cpp | 287 +++++++++++++++++++++++++------------------------
 1 file changed, 145 insertions(+), 142 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/8939609d/src/exec/exec.cpp
----------------------------------------------------------------------
diff --git a/src/exec/exec.cpp b/src/exec/exec.cpp
index 0dfd5a6..930dda9 100644
--- a/src/exec/exec.cpp
+++ b/src/exec/exec.cpp
@@ -43,9 +43,9 @@
 #include <stout/os.hpp>
 #include <stout/stopwatch.hpp>
 #include <stout/stringify.hpp>
+#include <stout/synchronized.hpp>
 #include <stout/uuid.hpp>
 
-#include "common/lock.hpp"
 #include "common/protobuf_utils.hpp"
 
 #include "logging/flags.hpp"
@@ -404,8 +404,9 @@ protected:
   {
     terminate(self());
 
-    Lock lock(mutex);
-    pthread_cond_signal(cond);
+    synchronized (mutex) {
+      pthread_cond_signal(cond);
+    }
   }
 
   void abort()
@@ -413,8 +414,9 @@ protected:
     LOG(INFO) << "Deactivating the executor libprocess";
     CHECK(aborted);
 
-    Lock lock(mutex);
-    pthread_cond_signal(cond);
+    synchronized (mutex) {
+      pthread_cond_signal(cond);
+    }
   }
 
   void _recoveryTimeout(UUID _connection)
@@ -611,173 +613,174 @@ MesosExecutorDriver::~MesosExecutorDriver()
 
 Status MesosExecutorDriver::start()
 {
-  Lock lock(&mutex);
+  synchronized (mutex) {
+    if (status != DRIVER_NOT_STARTED) {
+      return status;
+    }
 
-  if (status != DRIVER_NOT_STARTED) {
-    return status;
-  }
+    // Set stream buffering mode to flush on newlines so that we
+    // capture logs from user processes even when output is redirected
+    // to a file.
+    setvbuf(stdout, 0, _IOLBF, 0);
+    setvbuf(stderr, 0, _IOLBF, 0);
 
-  // Set stream buffering mode to flush on newlines so that we capture logs
-  // from user processes even when output is redirected to a file.
-  setvbuf(stdout, 0, _IOLBF, 0);
-  setvbuf(stderr, 0, _IOLBF, 0);
+    bool local;
 
-  bool local;
+    UPID slave;
+    SlaveID slaveId;
+    FrameworkID frameworkId;
+    ExecutorID executorId;
+    string workDirectory;
+    bool checkpoint;
 
-  UPID slave;
-  SlaveID slaveId;
-  FrameworkID frameworkId;
-  ExecutorID executorId;
-  string workDirectory;
-  bool checkpoint;
-
-  Option<string> value;
-  std::istringstream iss;
+    Option<string> value;
+    std::istringstream iss;
 
-  // Check if this is local (for example, for testing).
-  local = os::getenv("MESOS_LOCAL").isSome();
+    // Check if this is local (for example, for testing).
+    local = os::getenv("MESOS_LOCAL").isSome();
 
-  // Get slave PID from environment.
-  value = os::getenv("MESOS_SLAVE_PID");
-  if (value.isNone()) {
-    EXIT(1) << "Expecting 'MESOS_SLAVE_PID' to be set in the environment.";
-  }
+    // Get slave PID from environment.
+    value = os::getenv("MESOS_SLAVE_PID");
+    if (value.isNone()) {
+      EXIT(1) << "Expecting 'MESOS_SLAVE_PID' to be set in the environment.";
+    }
 
-  slave = UPID(value.get());
-  CHECK(slave) << "Cannot parse MESOS_SLAVE_PID '" << value.get() << "'";
+    slave = UPID(value.get());
+    CHECK(slave) << "Cannot parse MESOS_SLAVE_PID '" << value.get() << "'";
 
-  // Get slave ID from environment.
-  value = os::getenv("MESOS_SLAVE_ID");
-  if (value.isNone()) {
-    EXIT(1) << "Expecting 'MESOS_SLAVE_ID' to be set in the environment.";
-  }
-  slaveId.set_value(value.get());
+    // Get slave ID from environment.
+    value = os::getenv("MESOS_SLAVE_ID");
+    if (value.isNone()) {
+      EXIT(1) << "Expecting 'MESOS_SLAVE_ID' to be set in the environment.";
+    }
+    slaveId.set_value(value.get());
 
-  // Get framework ID from environment.
-  value = os::getenv("MESOS_FRAMEWORK_ID");
-  if (value.isNone()) {
-    EXIT(1) << "Expecting 'MESOS_FRAMEWORK_ID' to be set in the environment.";
-  }
-  frameworkId.set_value(value.get());
+    // Get framework ID from environment.
+    value = os::getenv("MESOS_FRAMEWORK_ID");
+    if (value.isNone()) {
+      EXIT(1) << "Expecting 'MESOS_FRAMEWORK_ID' to be set in the environment.";
+    }
+    frameworkId.set_value(value.get());
 
-  // Get executor ID from environment.
-  value = os::getenv("MESOS_EXECUTOR_ID");
-  if (value.isNone()) {
-    EXIT(1) << "Expecting 'MESOS_EXECUTOR_ID' to be set in the environment.";
-  }
-  executorId.set_value(value.get());
+    // Get executor ID from environment.
+    value = os::getenv("MESOS_EXECUTOR_ID");
+    if (value.isNone()) {
+      EXIT(1) << "Expecting 'MESOS_EXECUTOR_ID' to be set in the environment.";
+    }
+    executorId.set_value(value.get());
 
-  // Get working directory from environment.
-  value = os::getenv("MESOS_DIRECTORY");
-  if (value.isNone()) {
-    EXIT(1) << "Expecting 'MESOS_DIRECTORY' to be set in the environment.";
-  }
-  workDirectory = value.get();
+    // Get working directory from environment.
+    value = os::getenv("MESOS_DIRECTORY");
+    if (value.isNone()) {
+      EXIT(1) << "Expecting 'MESOS_DIRECTORY' to be set in the environment.";
+    }
+    workDirectory = value.get();
 
-  // Get checkpointing status from environment.
-  value = os::getenv("MESOS_CHECKPOINT");
-  checkpoint = value.isSome() && value.get() == "1";
+    // Get checkpointing status from environment.
+    value = os::getenv("MESOS_CHECKPOINT");
+    checkpoint = value.isSome() && value.get() == "1";
 
-  Duration recoveryTimeout = slave::RECOVERY_TIMEOUT;
+    Duration recoveryTimeout = slave::RECOVERY_TIMEOUT;
 
-  // Get the recovery timeout if checkpointing is enabled.
-  if (checkpoint) {
-    value = os::getenv("MESOS_RECOVERY_TIMEOUT");
+    // Get the recovery timeout if checkpointing is enabled.
+    if (checkpoint) {
+      value = os::getenv("MESOS_RECOVERY_TIMEOUT");
 
-    if (value.isSome()) {
-      Try<Duration> _recoveryTimeout = Duration::parse(value.get());
+      if (value.isSome()) {
+        Try<Duration> _recoveryTimeout = Duration::parse(value.get());
 
-      CHECK_SOME(_recoveryTimeout)
-          << "Cannot parse MESOS_RECOVERY_TIMEOUT '" << value.get() << "': "
-          << _recoveryTimeout.error();
+        CHECK_SOME(_recoveryTimeout)
+            << "Cannot parse MESOS_RECOVERY_TIMEOUT '" << value.get() << "': "
+            << _recoveryTimeout.error();
 
-      recoveryTimeout = _recoveryTimeout.get();
+        recoveryTimeout = _recoveryTimeout.get();
+      }
     }
-  }
 
-  CHECK(process == NULL);
-
-  process = new ExecutorProcess(
-      slave,
-      this,
-      executor,
-      slaveId,
-      frameworkId,
-      executorId,
-      local,
-      workDirectory,
-      checkpoint,
-      recoveryTimeout,
-      &mutex,
-      &cond);
-
-  spawn(process);
-
-  return status = DRIVER_RUNNING;
+    CHECK(process == NULL);
+
+    process = new ExecutorProcess(
+        slave,
+        this,
+        executor,
+        slaveId,
+        frameworkId,
+        executorId,
+        local,
+        workDirectory,
+        checkpoint,
+        recoveryTimeout,
+        &mutex,
+        &cond);
+
+    spawn(process);
+
+    return status = DRIVER_RUNNING;
+  }
 }
 
 
 Status MesosExecutorDriver::stop()
 {
-  Lock lock(&mutex);
-
-  if (status != DRIVER_RUNNING && status != DRIVER_ABORTED) {
-    return status;
-  }
+  synchronized (mutex) {
+    if (status != DRIVER_RUNNING && status != DRIVER_ABORTED) {
+      return status;
+    }
 
-  CHECK(process != NULL);
+    CHECK(process != NULL);
 
-  dispatch(process, &ExecutorProcess::stop);
+    dispatch(process, &ExecutorProcess::stop);
 
-  bool aborted = status == DRIVER_ABORTED;
+    bool aborted = status == DRIVER_ABORTED;
 
-  status = DRIVER_STOPPED;
+    status = DRIVER_STOPPED;
 
-  return aborted ? DRIVER_ABORTED : status;
+    return aborted ? DRIVER_ABORTED : status;
+  }
 }
 
 
 Status MesosExecutorDriver::abort()
 {
-  Lock lock(&mutex);
-
-  if (status != DRIVER_RUNNING) {
-    return status;
-  }
+  synchronized (mutex) {
+    if (status != DRIVER_RUNNING) {
+      return status;
+    }
 
-  CHECK(process != NULL);
+    CHECK(process != NULL);
 
-  // We set the volatile aborted to true here to prevent any further
-  // messages from being processed in the ExecutorProcess. However,
-  // if abort() is called from another thread as the ExecutorProcess,
-  // there may be at most one additional message processed.
-  // TODO(bmahler): Use an atomic boolean.
-  process->aborted = true;
+    // We set the volatile aborted to true here to prevent any further
+    // messages from being processed in the ExecutorProcess. However,
+    // if abort() is called from another thread as the ExecutorProcess,
+    // there may be at most one additional message processed.
+    // TODO(bmahler): Use an atomic boolean.
+    process->aborted = true;
 
-  // Dispatching here ensures that we still process the outstanding
-  // requests *from* the executor, since those do proceed when
-  // aborted is true.
-  dispatch(process, &ExecutorProcess::abort);
+    // Dispatching here ensures that we still process the outstanding
+    // requests *from* the executor, since those do proceed when
+    // aborted is true.
+    dispatch(process, &ExecutorProcess::abort);
 
-  return status = DRIVER_ABORTED;
+    return status = DRIVER_ABORTED;
+  }
 }
 
 
 Status MesosExecutorDriver::join()
 {
-  Lock lock(&mutex);
-
-  if (status != DRIVER_RUNNING) {
-    return status;
-  }
+  synchronized (mutex) {
+    if (status != DRIVER_RUNNING) {
+      return status;
+    }
 
-  while (status == DRIVER_RUNNING) {
-    pthread_cond_wait(&cond, &mutex);
-  }
+    while (status == DRIVER_RUNNING) {
+      pthread_cond_wait(&cond, &mutex);
+    }
 
-  CHECK(status == DRIVER_ABORTED || status == DRIVER_STOPPED);
+    CHECK(status == DRIVER_ABORTED || status == DRIVER_STOPPED);
 
-  return status;
+    return status;
+  }
 }
 
 
@@ -790,31 +793,31 @@ Status MesosExecutorDriver::run()
 
 Status MesosExecutorDriver::sendStatusUpdate(const TaskStatus& taskStatus)
 {
-  Lock lock(&mutex);
-
-  if (status != DRIVER_RUNNING) {
-    return status;
-  }
+  synchronized (mutex) {
+    if (status != DRIVER_RUNNING) {
+      return status;
+    }
 
-  CHECK(process != NULL);
+    CHECK(process != NULL);
 
-  dispatch(process, &ExecutorProcess::sendStatusUpdate, taskStatus);
+    dispatch(process, &ExecutorProcess::sendStatusUpdate, taskStatus);
 
-  return status;
+    return status;
+  }
 }
 
 
 Status MesosExecutorDriver::sendFrameworkMessage(const string& data)
 {
-  Lock lock(&mutex);
-
-  if (status != DRIVER_RUNNING) {
-    return status;
-  }
+  synchronized (mutex) {
+    if (status != DRIVER_RUNNING) {
+      return status;
+    }
 
-  CHECK(process != NULL);
+    CHECK(process != NULL);
 
-  dispatch(process, &ExecutorProcess::sendFrameworkMessage, data);
+    dispatch(process, &ExecutorProcess::sendFrameworkMessage, data);
 
-  return status;
+    return status;
+  }
 }