You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2015/06/14 11:46:37 UTC
[19/21] mesos git commit: Update Mesos executor to use synchronized.
Update Mesos executor to use synchronized.
Review: https://reviews.apache.org/r/35097
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8939609d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8939609d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8939609d
Branch: refs/heads/master
Commit: 8939609d403aa1043d637cc03647c4ee40478b20
Parents: b2d8047
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Jun 13 07:12:31 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Jun 14 02:43:01 2015 -0700
----------------------------------------------------------------------
src/exec/exec.cpp | 287 +++++++++++++++++++++++++------------------------
1 file changed, 145 insertions(+), 142 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/8939609d/src/exec/exec.cpp
----------------------------------------------------------------------
diff --git a/src/exec/exec.cpp b/src/exec/exec.cpp
index 0dfd5a6..930dda9 100644
--- a/src/exec/exec.cpp
+++ b/src/exec/exec.cpp
@@ -43,9 +43,9 @@
#include <stout/os.hpp>
#include <stout/stopwatch.hpp>
#include <stout/stringify.hpp>
+#include <stout/synchronized.hpp>
#include <stout/uuid.hpp>
-#include "common/lock.hpp"
#include "common/protobuf_utils.hpp"
#include "logging/flags.hpp"
@@ -404,8 +404,9 @@ protected:
{
terminate(self());
- Lock lock(mutex);
- pthread_cond_signal(cond);
+ synchronized (mutex) {
+ pthread_cond_signal(cond);
+ }
}
void abort()
@@ -413,8 +414,9 @@ protected:
LOG(INFO) << "Deactivating the executor libprocess";
CHECK(aborted);
- Lock lock(mutex);
- pthread_cond_signal(cond);
+ synchronized (mutex) {
+ pthread_cond_signal(cond);
+ }
}
void _recoveryTimeout(UUID _connection)
@@ -611,173 +613,174 @@ MesosExecutorDriver::~MesosExecutorDriver()
Status MesosExecutorDriver::start()
{
- Lock lock(&mutex);
+ synchronized (mutex) {
+ if (status != DRIVER_NOT_STARTED) {
+ return status;
+ }
- if (status != DRIVER_NOT_STARTED) {
- return status;
- }
+ // Set stream buffering mode to flush on newlines so that we
+ // capture logs from user processes even when output is redirected
+ // to a file.
+ setvbuf(stdout, 0, _IOLBF, 0);
+ setvbuf(stderr, 0, _IOLBF, 0);
- // Set stream buffering mode to flush on newlines so that we capture logs
- // from user processes even when output is redirected to a file.
- setvbuf(stdout, 0, _IOLBF, 0);
- setvbuf(stderr, 0, _IOLBF, 0);
+ bool local;
- bool local;
+ UPID slave;
+ SlaveID slaveId;
+ FrameworkID frameworkId;
+ ExecutorID executorId;
+ string workDirectory;
+ bool checkpoint;
- UPID slave;
- SlaveID slaveId;
- FrameworkID frameworkId;
- ExecutorID executorId;
- string workDirectory;
- bool checkpoint;
-
- Option<string> value;
- std::istringstream iss;
+ Option<string> value;
+ std::istringstream iss;
- // Check if this is local (for example, for testing).
- local = os::getenv("MESOS_LOCAL").isSome();
+ // Check if this is local (for example, for testing).
+ local = os::getenv("MESOS_LOCAL").isSome();
- // Get slave PID from environment.
- value = os::getenv("MESOS_SLAVE_PID");
- if (value.isNone()) {
- EXIT(1) << "Expecting 'MESOS_SLAVE_PID' to be set in the environment.";
- }
+ // Get slave PID from environment.
+ value = os::getenv("MESOS_SLAVE_PID");
+ if (value.isNone()) {
+ EXIT(1) << "Expecting 'MESOS_SLAVE_PID' to be set in the environment.";
+ }
- slave = UPID(value.get());
- CHECK(slave) << "Cannot parse MESOS_SLAVE_PID '" << value.get() << "'";
+ slave = UPID(value.get());
+ CHECK(slave) << "Cannot parse MESOS_SLAVE_PID '" << value.get() << "'";
- // Get slave ID from environment.
- value = os::getenv("MESOS_SLAVE_ID");
- if (value.isNone()) {
- EXIT(1) << "Expecting 'MESOS_SLAVE_ID' to be set in the environment.";
- }
- slaveId.set_value(value.get());
+ // Get slave ID from environment.
+ value = os::getenv("MESOS_SLAVE_ID");
+ if (value.isNone()) {
+ EXIT(1) << "Expecting 'MESOS_SLAVE_ID' to be set in the environment.";
+ }
+ slaveId.set_value(value.get());
- // Get framework ID from environment.
- value = os::getenv("MESOS_FRAMEWORK_ID");
- if (value.isNone()) {
- EXIT(1) << "Expecting 'MESOS_FRAMEWORK_ID' to be set in the environment.";
- }
- frameworkId.set_value(value.get());
+ // Get framework ID from environment.
+ value = os::getenv("MESOS_FRAMEWORK_ID");
+ if (value.isNone()) {
+ EXIT(1) << "Expecting 'MESOS_FRAMEWORK_ID' to be set in the environment.";
+ }
+ frameworkId.set_value(value.get());
- // Get executor ID from environment.
- value = os::getenv("MESOS_EXECUTOR_ID");
- if (value.isNone()) {
- EXIT(1) << "Expecting 'MESOS_EXECUTOR_ID' to be set in the environment.";
- }
- executorId.set_value(value.get());
+ // Get executor ID from environment.
+ value = os::getenv("MESOS_EXECUTOR_ID");
+ if (value.isNone()) {
+ EXIT(1) << "Expecting 'MESOS_EXECUTOR_ID' to be set in the environment.";
+ }
+ executorId.set_value(value.get());
- // Get working directory from environment.
- value = os::getenv("MESOS_DIRECTORY");
- if (value.isNone()) {
- EXIT(1) << "Expecting 'MESOS_DIRECTORY' to be set in the environment.";
- }
- workDirectory = value.get();
+ // Get working directory from environment.
+ value = os::getenv("MESOS_DIRECTORY");
+ if (value.isNone()) {
+ EXIT(1) << "Expecting 'MESOS_DIRECTORY' to be set in the environment.";
+ }
+ workDirectory = value.get();
- // Get checkpointing status from environment.
- value = os::getenv("MESOS_CHECKPOINT");
- checkpoint = value.isSome() && value.get() == "1";
+ // Get checkpointing status from environment.
+ value = os::getenv("MESOS_CHECKPOINT");
+ checkpoint = value.isSome() && value.get() == "1";
- Duration recoveryTimeout = slave::RECOVERY_TIMEOUT;
+ Duration recoveryTimeout = slave::RECOVERY_TIMEOUT;
- // Get the recovery timeout if checkpointing is enabled.
- if (checkpoint) {
- value = os::getenv("MESOS_RECOVERY_TIMEOUT");
+ // Get the recovery timeout if checkpointing is enabled.
+ if (checkpoint) {
+ value = os::getenv("MESOS_RECOVERY_TIMEOUT");
- if (value.isSome()) {
- Try<Duration> _recoveryTimeout = Duration::parse(value.get());
+ if (value.isSome()) {
+ Try<Duration> _recoveryTimeout = Duration::parse(value.get());
- CHECK_SOME(_recoveryTimeout)
- << "Cannot parse MESOS_RECOVERY_TIMEOUT '" << value.get() << "': "
- << _recoveryTimeout.error();
+ CHECK_SOME(_recoveryTimeout)
+ << "Cannot parse MESOS_RECOVERY_TIMEOUT '" << value.get() << "': "
+ << _recoveryTimeout.error();
- recoveryTimeout = _recoveryTimeout.get();
+ recoveryTimeout = _recoveryTimeout.get();
+ }
}
- }
- CHECK(process == NULL);
-
- process = new ExecutorProcess(
- slave,
- this,
- executor,
- slaveId,
- frameworkId,
- executorId,
- local,
- workDirectory,
- checkpoint,
- recoveryTimeout,
- &mutex,
- &cond);
-
- spawn(process);
-
- return status = DRIVER_RUNNING;
+ CHECK(process == NULL);
+
+ process = new ExecutorProcess(
+ slave,
+ this,
+ executor,
+ slaveId,
+ frameworkId,
+ executorId,
+ local,
+ workDirectory,
+ checkpoint,
+ recoveryTimeout,
+ &mutex,
+ &cond);
+
+ spawn(process);
+
+ return status = DRIVER_RUNNING;
+ }
}
Status MesosExecutorDriver::stop()
{
- Lock lock(&mutex);
-
- if (status != DRIVER_RUNNING && status != DRIVER_ABORTED) {
- return status;
- }
+ synchronized (mutex) {
+ if (status != DRIVER_RUNNING && status != DRIVER_ABORTED) {
+ return status;
+ }
- CHECK(process != NULL);
+ CHECK(process != NULL);
- dispatch(process, &ExecutorProcess::stop);
+ dispatch(process, &ExecutorProcess::stop);
- bool aborted = status == DRIVER_ABORTED;
+ bool aborted = status == DRIVER_ABORTED;
- status = DRIVER_STOPPED;
+ status = DRIVER_STOPPED;
- return aborted ? DRIVER_ABORTED : status;
+ return aborted ? DRIVER_ABORTED : status;
+ }
}
Status MesosExecutorDriver::abort()
{
- Lock lock(&mutex);
-
- if (status != DRIVER_RUNNING) {
- return status;
- }
+ synchronized (mutex) {
+ if (status != DRIVER_RUNNING) {
+ return status;
+ }
- CHECK(process != NULL);
+ CHECK(process != NULL);
- // We set the volatile aborted to true here to prevent any further
- // messages from being processed in the ExecutorProcess. However,
- // if abort() is called from another thread as the ExecutorProcess,
- // there may be at most one additional message processed.
- // TODO(bmahler): Use an atomic boolean.
- process->aborted = true;
+ // We set the volatile aborted to true here to prevent any further
+ // messages from being processed in the ExecutorProcess. However,
+ // if abort() is called from another thread as the ExecutorProcess,
+ // there may be at most one additional message processed.
+ // TODO(bmahler): Use an atomic boolean.
+ process->aborted = true;
- // Dispatching here ensures that we still process the outstanding
- // requests *from* the executor, since those do proceed when
- // aborted is true.
- dispatch(process, &ExecutorProcess::abort);
+ // Dispatching here ensures that we still process the outstanding
+ // requests *from* the executor, since those do proceed when
+ // aborted is true.
+ dispatch(process, &ExecutorProcess::abort);
- return status = DRIVER_ABORTED;
+ return status = DRIVER_ABORTED;
+ }
}
Status MesosExecutorDriver::join()
{
- Lock lock(&mutex);
-
- if (status != DRIVER_RUNNING) {
- return status;
- }
+ synchronized (mutex) {
+ if (status != DRIVER_RUNNING) {
+ return status;
+ }
- while (status == DRIVER_RUNNING) {
- pthread_cond_wait(&cond, &mutex);
- }
+ while (status == DRIVER_RUNNING) {
+ pthread_cond_wait(&cond, &mutex);
+ }
- CHECK(status == DRIVER_ABORTED || status == DRIVER_STOPPED);
+ CHECK(status == DRIVER_ABORTED || status == DRIVER_STOPPED);
- return status;
+ return status;
+ }
}
@@ -790,31 +793,31 @@ Status MesosExecutorDriver::run()
Status MesosExecutorDriver::sendStatusUpdate(const TaskStatus& taskStatus)
{
- Lock lock(&mutex);
-
- if (status != DRIVER_RUNNING) {
- return status;
- }
+ synchronized (mutex) {
+ if (status != DRIVER_RUNNING) {
+ return status;
+ }
- CHECK(process != NULL);
+ CHECK(process != NULL);
- dispatch(process, &ExecutorProcess::sendStatusUpdate, taskStatus);
+ dispatch(process, &ExecutorProcess::sendStatusUpdate, taskStatus);
- return status;
+ return status;
+ }
}
Status MesosExecutorDriver::sendFrameworkMessage(const string& data)
{
- Lock lock(&mutex);
-
- if (status != DRIVER_RUNNING) {
- return status;
- }
+ synchronized (mutex) {
+ if (status != DRIVER_RUNNING) {
+ return status;
+ }
- CHECK(process != NULL);
+ CHECK(process != NULL);
- dispatch(process, &ExecutorProcess::sendFrameworkMessage, data);
+ dispatch(process, &ExecutorProcess::sendFrameworkMessage, data);
- return status;
+ return status;
+ }
}