You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2012/01/27 02:25:15 UTC
svn commit: r1236485 [1/7] - in /incubator/mesos/trunk: ./ include/mesos/
src/common/ src/exec/ src/local/ src/log/ src/master/ src/python/native/
src/sched/ src/slave/ src/tests/ src/zookeeper/ third_party/libprocess/
third_party/libprocess/include/pr...
Author: benh
Date: Fri Jan 27 01:25:13 2012
New Revision: 1236485
URL: http://svn.apache.org/viewvc?rev=1236485&view=rev
Log:
Updating libprocess to support Mac OS X Lion (10.7) and corresponding Mesos changes.
Added:
incubator/mesos/trunk/third_party/libprocess/include/process/clock.hpp
incubator/mesos/trunk/third_party/libprocess/include/process/defer.hpp
incubator/mesos/trunk/third_party/libprocess/include/process/deferred.hpp
incubator/mesos/trunk/third_party/libprocess/include/process/event.hpp
incubator/mesos/trunk/third_party/libprocess/include/process/executor.hpp
- copied, changed from r1235899, incubator/mesos/trunk/third_party/libprocess/include/process/async.hpp
incubator/mesos/trunk/third_party/libprocess/include/process/filter.hpp
incubator/mesos/trunk/third_party/libprocess/include/process/message.hpp
incubator/mesos/trunk/third_party/libprocess/include/process/preprocessor.hpp
incubator/mesos/trunk/third_party/libprocess/src/thread.hpp
Removed:
incubator/mesos/trunk/third_party/libprocess/include/process/async.hpp
Modified:
incubator/mesos/trunk/configure.ac
incubator/mesos/trunk/include/mesos/mesos.proto
incubator/mesos/trunk/include/mesos/scheduler.hpp
incubator/mesos/trunk/src/common/lock.cpp
incubator/mesos/trunk/src/common/lock.hpp
incubator/mesos/trunk/src/common/type_utils.hpp
incubator/mesos/trunk/src/exec/exec.cpp
incubator/mesos/trunk/src/local/local.cpp
incubator/mesos/trunk/src/log/coordinator.cpp
incubator/mesos/trunk/src/log/log.cpp
incubator/mesos/trunk/src/log/log.hpp
incubator/mesos/trunk/src/log/network.hpp
incubator/mesos/trunk/src/log/replica.cpp
incubator/mesos/trunk/src/master/frameworks_manager.cpp
incubator/mesos/trunk/src/master/frameworks_manager.hpp
incubator/mesos/trunk/src/master/http.cpp
incubator/mesos/trunk/src/master/http.hpp
incubator/mesos/trunk/src/master/master.cpp
incubator/mesos/trunk/src/master/master.hpp
incubator/mesos/trunk/src/master/slaves_manager.cpp
incubator/mesos/trunk/src/master/slaves_manager.hpp
incubator/mesos/trunk/src/python/native/mesos_executor_driver_impl.cpp
incubator/mesos/trunk/src/python/native/mesos_scheduler_driver_impl.cpp
incubator/mesos/trunk/src/python/native/proxy_scheduler.cpp
incubator/mesos/trunk/src/sched/sched.cpp
incubator/mesos/trunk/src/slave/http.cpp
incubator/mesos/trunk/src/slave/http.hpp
incubator/mesos/trunk/src/slave/reaper.cpp
incubator/mesos/trunk/src/slave/reaper.hpp
incubator/mesos/trunk/src/slave/slave.cpp
incubator/mesos/trunk/src/slave/slave.hpp
incubator/mesos/trunk/src/tests/exception_tests.cpp
incubator/mesos/trunk/src/tests/fault_tolerance_tests.cpp
incubator/mesos/trunk/src/tests/log_tests.cpp
incubator/mesos/trunk/src/tests/master_tests.cpp
incubator/mesos/trunk/src/tests/utils.hpp
incubator/mesos/trunk/src/zookeeper/group.cpp
incubator/mesos/trunk/src/zookeeper/zookeeper.cpp
incubator/mesos/trunk/third_party/libprocess/Makefile.in
incubator/mesos/trunk/third_party/libprocess/include/process/dispatch.hpp
incubator/mesos/trunk/third_party/libprocess/include/process/future.hpp
incubator/mesos/trunk/third_party/libprocess/include/process/gc.hpp
incubator/mesos/trunk/third_party/libprocess/include/process/latch.hpp
incubator/mesos/trunk/third_party/libprocess/include/process/process.hpp
incubator/mesos/trunk/third_party/libprocess/include/process/protobuf.hpp
incubator/mesos/trunk/third_party/libprocess/include/process/run.hpp
incubator/mesos/trunk/third_party/libprocess/include/process/timer.hpp
incubator/mesos/trunk/third_party/libprocess/src/encoder.hpp
incubator/mesos/trunk/third_party/libprocess/src/foreach.hpp
incubator/mesos/trunk/third_party/libprocess/src/latch.cpp
incubator/mesos/trunk/third_party/libprocess/src/process.cpp
incubator/mesos/trunk/third_party/libprocess/src/tests.cpp
incubator/mesos/trunk/third_party/libprocess/src/timer.cpp
Modified: incubator/mesos/trunk/configure.ac
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/configure.ac?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/configure.ac (original)
+++ incubator/mesos/trunk/configure.ac Fri Jan 27 01:25:13 2012
@@ -128,8 +128,6 @@ case "${target_os}" in
echo Setting up build environment for ${target_cpu} ${target_os}
echo ===========================================================
OS_NAME=darwin
- CFLAGS="$CFLAGS -D_XOPEN_SOURCE"
- CXXFLAGS="$CXXFLAGS -D_XOPEN_SOURCE"
;;
solaris*)
echo ===========================================================
Modified: incubator/mesos/trunk/include/mesos/mesos.proto
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/include/mesos/mesos.proto?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/include/mesos/mesos.proto (original)
+++ incubator/mesos/trunk/include/mesos/mesos.proto Fri Jan 27 01:25:13 2012
@@ -110,6 +110,18 @@ message ExecutorInfo {
/**
+ * Describes a master. This will probably have more fields in the
+ * future which might be used, for example, to link a framework webui
+ * to a master webui.
+ */
+message MasterInfo {
+ required string id = 1;
+ required uint32 ip = 2;
+ required uint32 port = 3;
+}
+
+
+/**
* Describes a slave. The 'webui_hostname' and 'webui_port' are
* provided in the event a host has different private and public
* hostnames (e.g., Amazon EC2).
Modified: incubator/mesos/trunk/include/mesos/scheduler.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/include/mesos/scheduler.hpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/include/mesos/scheduler.hpp (original)
+++ incubator/mesos/trunk/include/mesos/scheduler.hpp Fri Jan 27 01:25:13 2012
@@ -284,7 +284,7 @@ public:
* variables, as well as any configuration files found through the
* environment variables.
*/
- MesosSchedulerDriver(Scheduler* sched,
+ MesosSchedulerDriver(Scheduler* scheduler,
const std::string& frameworkName,
const ExecutorInfo& executorInfo,
const std::string& url,
@@ -299,7 +299,7 @@ public:
* Additional Mesos config options are obtained from the 'params'
* argument.
*/
- MesosSchedulerDriver(Scheduler* sched,
+ MesosSchedulerDriver(Scheduler* scheduler,
const std::string& frameworkName,
const ExecutorInfo& executorInfo,
const std::map<std::string, std::string>& params,
@@ -311,7 +311,7 @@ public:
* command-line (via 'argc' and 'argv'). Optionally providing an
* existing framework ID can be used to failover a framework.
*/
- MesosSchedulerDriver(Scheduler* sched,
+ MesosSchedulerDriver(Scheduler* scheduler,
const std::string& frameworkName,
const ExecutorInfo& executorInfo,
int argc,
@@ -349,16 +349,13 @@ public:
private:
// Initialization method used by constructors.
- void init(Scheduler* sched,
+ void init(Scheduler* scheduler,
internal::Configuration* conf,
const FrameworkID& frameworkId,
const std::string& frameworkName,
const ExecutorInfo& executorInfo);
- // Internal utility method to report an error to the scheduler.
- void error(int code, const std::string& message);
-
- Scheduler* sched;
+ Scheduler* scheduler;
std::string url;
FrameworkID frameworkId;
std::string frameworkName;
Modified: incubator/mesos/trunk/src/common/lock.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/common/lock.cpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/common/lock.cpp (original)
+++ incubator/mesos/trunk/src/common/lock.cpp Fri Jan 27 01:25:13 2012
@@ -18,16 +18,38 @@
#include "lock.hpp"
-using namespace mesos::internal;
+namespace mesos {
+namespace internal {
+Lock::Lock(pthread_mutex_t* _mutex)
+ : mutex(_mutex)
+{
+ lock();
+}
+
+
+void Lock::lock()
+{
+ if (!locked) {
+ pthread_mutex_lock(mutex);
+ locked = true;
+ }
+}
-Lock::Lock(pthread_mutex_t* _mutex): mutex(_mutex)
+
+void Lock::unlock()
{
- pthread_mutex_lock(mutex);
+ if (locked) {
+ pthread_mutex_unlock(mutex);
+ locked = false;
+ }
}
Lock::~Lock()
{
- pthread_mutex_unlock(mutex);
+ unlock();
}
+
+} // namespace internal {
+} // namespace mesos {
Modified: incubator/mesos/trunk/src/common/lock.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/common/lock.hpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/common/lock.hpp (original)
+++ incubator/mesos/trunk/src/common/lock.hpp Fri Jan 27 01:25:13 2012
@@ -16,27 +16,30 @@
* limitations under the License.
*/
-#ifndef LOCK_HPP
-#define LOCK_HPP
+#ifndef __LOCK_HPP__
+#define __LOCK_HPP__
#include <pthread.h>
+namespace mesos {
+namespace internal {
-namespace mesos { namespace internal {
-
-/**
- * RAII class for locking pthread_mutexes.
- */
+// RAII class for locking pthread_mutexes.
class Lock
{
- pthread_mutex_t* mutex;
-
public:
Lock(pthread_mutex_t* _mutex);
~Lock();
-};
-}} /* namespace mesos { namespace internal { */
+ void lock();
+ void unlock();
+
+private:
+ pthread_mutex_t* mutex;
+ bool locked;
+};
+} // namespace internal {
+} // namespace mesos {
-#endif /* LOCK_HPP */
+#endif // __LOCK_HPP__
Modified: incubator/mesos/trunk/src/common/type_utils.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/common/type_utils.hpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/common/type_utils.hpp (original)
+++ incubator/mesos/trunk/src/common/type_utils.hpp Fri Jan 27 01:25:13 2012
@@ -73,6 +73,13 @@ inline std::ostream& operator << (std::o
}
+inline std::ostream& operator << (std::ostream& stream, const TaskDescription& task)
+{
+ stream << task.DebugString();
+ return stream;
+}
+
+
inline bool operator == (const FrameworkID& left, const FrameworkID& right)
{
return left.value() == right.value();
Modified: incubator/mesos/trunk/src/exec/exec.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/exec/exec.cpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/exec/exec.cpp (original)
+++ incubator/mesos/trunk/src/exec/exec.cpp Fri Jan 27 01:25:13 2012
@@ -24,8 +24,6 @@
#include <string>
#include <sstream>
-#include <boost/bind.hpp>
-
#include <mesos/executor.hpp>
#include <process/dispatch.hpp>
@@ -46,14 +44,13 @@ using namespace mesos::internal;
using namespace process;
-using boost::bind;
-
using std::string;
using process::wait; // Necessary on some OS's to disambiguate.
-namespace mesos { namespace internal {
+namespace mesos {
+namespace internal {
class ExecutorProcess : public ProtobufProcess<ExecutorProcess>
{
@@ -74,35 +71,33 @@ public:
aborted(false),
directory(_directory)
{
- installProtobufHandler<ExecutorRegisteredMessage>(
+ install<ExecutorRegisteredMessage>(
&ExecutorProcess::registered,
&ExecutorRegisteredMessage::args);
- installProtobufHandler<RunTaskMessage>(
+ install<RunTaskMessage>(
&ExecutorProcess::runTask,
&RunTaskMessage::task);
- installProtobufHandler<KillTaskMessage>(
+ install<KillTaskMessage>(
&ExecutorProcess::killTask,
&KillTaskMessage::task_id);
- installProtobufHandler<FrameworkToExecutorMessage>(
+ install<FrameworkToExecutorMessage>(
&ExecutorProcess::frameworkMessage,
&FrameworkToExecutorMessage::slave_id,
&FrameworkToExecutorMessage::framework_id,
&FrameworkToExecutorMessage::executor_id,
&FrameworkToExecutorMessage::data);
- installProtobufHandler<ShutdownExecutorMessage>(
+ install<ShutdownExecutorMessage>(
&ExecutorProcess::shutdown);
-
- installMessageHandler(EXITED, &ExecutorProcess::exited);
}
virtual ~ExecutorProcess() {}
protected:
- virtual void operator () ()
+ virtual void initialize()
{
VLOG(1) << "Executor started at: " << self();
@@ -113,8 +108,6 @@ protected:
message.mutable_framework_id()->MergeFrom(frameworkId);
message.mutable_executor_id()->MergeFrom(executorId);
send(slave, message);
-
- do { if (serve() == TERMINATE) break; } while (true);
}
void registered(const ExecutorArgs& args)
@@ -127,7 +120,7 @@ protected:
VLOG(1) << "Executor registered on slave " << args.slave_id();
slaveId = args.slave_id();
- invoke(bind(&Executor::init, executor, driver, args));
+ executor->init(driver, args);
}
void runTask(const TaskDescription& task)
@@ -139,7 +132,7 @@ protected:
VLOG(1) << "Executor asked to run task '" << task.task_id() << "'";
- invoke(bind(&Executor::launchTask, executor, driver, task));
+ executor->launchTask(driver, task);
}
void killTask(const TaskID& taskId)
@@ -151,7 +144,7 @@ protected:
VLOG(1) << "Executor asked to kill task '" << taskId << "'";
- invoke(bind(&Executor::killTask, executor, driver, taskId));
+ executor->killTask(driver, taskId);
}
void frameworkMessage(const SlaveID& slaveId,
@@ -166,7 +159,7 @@ protected:
VLOG(1) << "Executor received framework message";
- invoke(bind(&Executor::frameworkMessage, executor, driver, data));
+ executor->frameworkMessage(driver, data);
}
void shutdown()
@@ -179,7 +172,7 @@ protected:
VLOG(1) << "Executor asked to shutdown";
// TODO(benh): Any need to invoke driver.stop?
- invoke(bind(&Executor::shutdown, executor, driver));
+ executor->shutdown(driver);
if (!local) {
exit(0);
} else {
@@ -193,7 +186,7 @@ protected:
aborted = true;
}
- void exited()
+ virtual void exited(const UPID& pid)
{
if (aborted) {
VLOG(1) << "Ignoring exited event because the driver is aborted!";
@@ -203,7 +196,7 @@ protected:
VLOG(1) << "Slave exited, trying to shutdown";
// TODO: Pass an argument to shutdown to tell it this is abnormal?
- invoke(bind(&Executor::shutdown, executor, driver));
+ executor->shutdown(driver);
// This is a pretty bad state ... no slave is left. Rather
// than exit lets kill our process group (which includes
@@ -225,7 +218,7 @@ protected:
update->mutable_executor_id()->MergeFrom(executorId);
update->mutable_slave_id()->MergeFrom(slaveId);
update->mutable_status()->MergeFrom(status);
- update->set_timestamp(elapsedTime());
+ update->set_timestamp(Clock::now());
update->set_uuid(UUID::random().toBytes());
send(slave, message);
}
@@ -254,7 +247,8 @@ private:
const std::string directory;
};
-}} // namespace mesos { namespace internal {
+} // namespace internal {
+} // namespace mesos {
// Implementation of C++ API.
@@ -397,6 +391,11 @@ Status MesosExecutorDriver::stop()
terminate(process);
state = STOPPED;
+
+ // TODO(benh): Set the condition variable in ExecutorProcess just as
+ // we do with the MesosSchedulerDriver and SchedulerProcess:
+ // dispatch(process, &ExecutorProcess::stop);
+
pthread_cond_signal(&cond);
return OK;
@@ -419,6 +418,9 @@ Status MesosExecutorDriver::abort()
CHECK(process != NULL);
+ // TODO(benh): Set the condition variable in ExecutorProcess just as
+ // we do with the MesosSchedulerDriver and SchedulerProcess.
+
dispatch(process, &ExecutorProcess::abort);
pthread_cond_signal(&cond);
Modified: incubator/mesos/trunk/src/local/local.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/local/local.cpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/local/local.cpp (original)
+++ incubator/mesos/trunk/src/local/local.cpp Fri Jan 27 01:25:13 2012
@@ -118,6 +118,8 @@ PID<Master> launch(const Configuration&
vector<UPID> pids;
+ // TODO(benh): Launching more than one slave is actually not kosher
+ // since each slave tries to take the "slave" id.
for (int i = 0; i < numSlaves; i++) {
// TODO(benh): Create a local isolation module?
ProcessBasedIsolationModule *isolationModule =
@@ -136,7 +138,7 @@ PID<Master> launch(const Configuration&
void shutdown()
{
if (master != NULL) {
- process::post(master->self(), process::TERMINATE);
+ process::terminate(master->self());
process::wait(master->self());
delete master;
delete allocator;
@@ -149,7 +151,7 @@ void shutdown()
// we have stopped the slave.
foreachpair (IsolationModule* isolationModule, Slave* slave, slaves) {
- process::post(slave->self(), process::TERMINATE);
+ process::terminate(slave->self());
process::wait(slave->self());
delete isolationModule;
delete slave;
Modified: incubator/mesos/trunk/src/log/coordinator.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/log/coordinator.cpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/log/coordinator.cpp (original)
+++ incubator/mesos/trunk/src/log/coordinator.cpp Fri Jan 27 01:25:13 2012
@@ -22,7 +22,6 @@
#include <process/future.hpp>
#include "common/foreach.hpp"
-#include "common/option.hpp"
#include "log/coordinator.hpp"
#include "log/replica.hpp"
@@ -64,9 +63,9 @@ Result<uint64_t> Coordinator::elect(cons
// Get the highest known promise from our local replica.
Future<uint64_t> promise = replica->promised();
- promise.await(); // TODO(benh): Take a timeout and use it here!
-
- if (promise.isFailed()) {
+ if (!promise.await(timeout.remaining())) {
+ return Result<uint64_t>::none();
+ } else if (promise.isFailed()) {
return Result<uint64_t>::error(promise.failure());
}
@@ -81,14 +80,13 @@ Result<uint64_t> Coordinator::elect(cons
set<Future<PromiseResponse> > futures =
broadcast(protocol::promise, request);
- Option<Future<PromiseResponse> > option;
int okays = 0;
do {
- option = select(futures, timeout.remaining());
- if (option.isSome()) {
- CHECK(option.get().isReady());
- const PromiseResponse& response = option.get().get();
+ Future<Future<PromiseResponse> > future = select(futures);
+ if (future.await(timeout.remaining())) {
+ CHECK(future.get().isReady());
+ const PromiseResponse& response = future.get().get();
if (!response.okay()) {
return Result<uint64_t>::none(); // Lost an election, but can retry.
} else if (response.okay()) {
@@ -99,9 +97,9 @@ Result<uint64_t> Coordinator::elect(cons
break;
}
}
- futures.erase(option.get());
+ futures.erase(future.get());
}
- } while (option.isSome());
+ } while (timeout.remaining() > 0);
// Discard the remaining futures.
discard(futures);
@@ -120,9 +118,10 @@ Result<uint64_t> Coordinator::elect(cons
Future<set<uint64_t> > positions = replica->missing(index);
- positions.await(timeout.remaining());
-
- if (positions.isFailed()) {
+ if (!positions.await(timeout.remaining())) {
+ elected = false;
+ return Result<uint64_t>::none();
+ } else if (positions.isFailed()) {
elected = false;
return Result<uint64_t>::error(positions.failure());
}
@@ -176,7 +175,7 @@ Result<uint64_t> Coordinator::append(
Action::Append* append = action.mutable_append();
append->set_bytes(bytes);
- Result<uint64_t> result = write(action, Timeout(timeout));
+ Result<uint64_t> result = write(action, timeout);
if (result.isSome()) {
CHECK(result.get() == index);
@@ -266,17 +265,15 @@ Result<uint64_t> Coordinator::write(
set<Future<WriteResponse> > futures =
remotecast(protocol::write, request);
- Option<Future<WriteResponse> > option;
int okays = 0;
do {
- option = select(futures, timeout.remaining());
- if (option.isSome()) {
- CHECK(option.get().isReady());
- const WriteResponse& response = option.get().get();
+ Future<Future<WriteResponse> > future = select(futures);
+ if (future.await(timeout.remaining())) {
+ CHECK(future.get().isReady());
+ const WriteResponse& response = future.get().get();
CHECK(response.id() == request.id());
CHECK(response.position() == request.position());
-
if (!response.okay()) {
elected = false;
return Result<uint64_t>::error("Coordinator demoted");
@@ -296,11 +293,11 @@ Result<uint64_t> Coordinator::write(
}
}
}
- futures.erase(option.get());
+ futures.erase(future.get());
}
- } while (option.isSome());
+ } while (timeout.remaining() > 0);
- // Timed out ...
+ // Timed out ... discard remaining futures.
discard(futures);
return Result<uint64_t>::none();
}
@@ -397,14 +394,13 @@ Result<Action> Coordinator::fill(uint64_
set<Future<PromiseResponse> > futures =
broadcast(protocol::promise, request);
- Option<Future<PromiseResponse> > option;
list<PromiseResponse> responses;
do {
- option = select(futures, timeout.remaining());
- if (option.isSome()) {
- CHECK(option.get().isReady());
- const PromiseResponse& response = option.get().get();
+ Future<Future<PromiseResponse> > future = select(futures);
+ if (future.await(timeout.remaining())) {
+ CHECK(future.get().isReady());
+ const PromiseResponse& response = future.get().get();
CHECK(response.id() == request.id());
if (!response.okay()) {
elected = false;
@@ -415,9 +411,9 @@ Result<Action> Coordinator::fill(uint64_
break;
}
}
- futures.erase(option.get());
+ futures.erase(future.get());
}
- } while (option.isSome());
+ } while (timeout.remaining() > 0);
// Discard the remaining futures.
discard(futures);
Modified: incubator/mesos/trunk/src/log/log.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/log/log.cpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/log/log.cpp (original)
+++ incubator/mesos/trunk/src/log/log.cpp Fri Jan 27 01:25:13 2012
@@ -241,7 +241,7 @@ public:
void updated(const string& path);
protected:
- virtual void operator () ();
+ virtual void initialze();
private:
// Updates the group.
@@ -526,13 +526,11 @@ void LogProcess::updated(const string& p
}
-void LogProcess::operator () ()
+void LogProcess::initalize()
{
// TODO(benh): Real testing requires injecting a ZooKeeper instance.
watcher = new LogProcessWatcher(self());
zk = new ZooKeeper(servers, 10000, watcher);
-
- do { if (serve() == process::TERMINATE) break; } while (true);
}
Modified: incubator/mesos/trunk/src/log/log.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/log/log.hpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/log/log.hpp (original)
+++ incubator/mesos/trunk/src/log/log.hpp Fri Jan 27 01:25:13 2012
@@ -212,13 +212,13 @@ public:
LOG(INFO) << "Attempting to join replica to ZooKeeper group";
membership = group->join(replica->pid())
- .onFailed(dispatch(lambda::bind(&Log::failed, this, lambda::_1)))
- .onDiscarded(dispatch(lambda::bind(&Log::discarded, this)));
+ .onFailed(executor.defer(lambda::bind(&Log::failed, this, lambda::_1)))
+ .onDiscarded(executor.defer(lambda::bind(&Log::discarded, this)));
group->watch()
- .onReady(dispatch(lambda::bind(&Log::watch, this, lambda::_1)))
- .onFailed(dispatch(lambda::bind(&Log::failed, this, lambda::_1)))
- .onDiscarded(dispatch(lambda::bind(&Log::discarded, this)));
+ .onReady(executor.defer(lambda::bind(&Log::watch, this, lambda::_1)))
+ .onFailed(executor.defer(lambda::bind(&Log::failed, this, lambda::_1)))
+ .onDiscarded(executor.defer(lambda::bind(&Log::discarded, this)));
}
~Log()
@@ -256,7 +256,7 @@ private:
zookeeper::Group* group;
process::Future<zookeeper::Group::Membership> membership;
- async::Dispatch dispatch;
+ process::Executor executor;
int quorum;
@@ -418,14 +418,14 @@ void Log::watch(const std::set<zookeeper
// Our replica's membership must have expired, join back up.
LOG(INFO) << "Renewing replica group membership";
membership = group->join(replica->pid())
- .onFailed(dispatch(lambda::bind(&Log::failed, this, lambda::_1)))
- .onDiscarded(dispatch(lambda::bind(&Log::discarded, this)));
+ .onFailed(executor.defer(lambda::bind(&Log::failed, this, lambda::_1)))
+ .onDiscarded(executor.defer(lambda::bind(&Log::discarded, this)));
}
group->watch(memberships)
- .onReady(dispatch(lambda::bind(&Log::watch, this, lambda::_1)))
- .onFailed(dispatch(lambda::bind(&Log::failed, this, lambda::_1)))
- .onDiscarded(dispatch(lambda::bind(&Log::discarded, this)));
+ .onReady(executor.defer(lambda::bind(&Log::watch, this, lambda::_1)))
+ .onFailed(executor.defer(lambda::bind(&Log::failed, this, lambda::_1)))
+ .onDiscarded(executor.defer(lambda::bind(&Log::discarded, this)));
}
Modified: incubator/mesos/trunk/src/log/network.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/log/network.hpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/log/network.hpp (original)
+++ incubator/mesos/trunk/src/log/network.hpp Fri Jan 27 01:25:13 2012
@@ -25,8 +25,8 @@
#include <set>
#include <string>
-#include <process/async.hpp>
-#include <process/process.hpp>
+#include <process/deferred.hpp>
+#include <process/executor.hpp>
#include <process/protobuf.hpp>
#include <process/timeout.hpp>
@@ -34,7 +34,6 @@
#include "common/foreach.hpp"
#include "common/lambda.hpp"
-#include "common/option.hpp"
#include "common/seconds.hpp"
#include "common/utils.hpp"
@@ -107,7 +106,7 @@ private:
zookeeper::Group* group;
- async::Dispatch dispatch;
+ process::Executor executor;
};
@@ -216,6 +215,7 @@ inline void Network::remove(const proces
process::dispatch(process, &NetworkProcess::remove, pid);
}
+
inline void Network::set(const std::set<process::UPID>& pids)
{
process::dispatch(process, &NetworkProcess::set, pids);
@@ -239,8 +239,7 @@ void Network::broadcast(
const std::set<process::UPID>& filter)
{
// Need to disambiguate overloaded function.
- void (NetworkProcess::*broadcast) (
- const M&, const std::set<process::UPID>&) =
+ void (NetworkProcess::*broadcast)(const M&, const std::set<process::UPID>&) =
&NetworkProcess::broadcast<M>;
process::dispatch(process, broadcast, m, filter);
@@ -256,12 +255,19 @@ inline ZooKeeperNetwork::ZooKeeperNetwor
inline void ZooKeeperNetwork::watch(
const std::set<zookeeper::Group::Membership>& memberships)
{
+ process::deferred<void(const std::set<zookeeper::Group::Membership>&)> ready =
+ executor.defer(lambda::bind(&ZooKeeperNetwork::ready, this, lambda::_1));
+
+ process::deferred<void(const std::string&)> failed =
+ executor.defer(lambda::bind(&ZooKeeperNetwork::failed, this, lambda::_1));
+
+ process::deferred<void(void)> discarded =
+ executor.defer(lambda::bind(&ZooKeeperNetwork::discarded, this));
+
group->watch(memberships)
- .onReady(dispatch(lambda::bind(&ZooKeeperNetwork::ready,
- this, lambda::_1)))
- .onFailed(dispatch(lambda::bind(&ZooKeeperNetwork::failed,
- this, lambda::_1)))
- .onDiscarded(dispatch(lambda::bind(&ZooKeeperNetwork::discarded, this)));
+ .onReady(ready)
+ .onFailed(failed)
+ .onDiscarded(discarded);
}
@@ -279,18 +285,16 @@ inline void ZooKeeperNetwork::ready(
std::set<process::UPID> pids;
- Option<process::Future<std::string> > option;
-
process::Timeout timeout = 5.0;
while (!futures.empty()) {
- option = select(futures, timeout.remaining());
- if (option.isSome()) {
- CHECK(option.get().isReady());
- process::UPID pid(option.get().get());
- CHECK(pid) << "Failed to parse '" << option.get().get() << "'";
+ process::Future<process::Future<std::string> > future = select(futures);
+ if (future.await(timeout.remaining())) {
+ CHECK(future.get().isReady());
+ process::UPID pid(future.get().get());
+ CHECK(pid) << "Failed to parse '" << future.get().get() << "'";
pids.insert(pid);
- futures.erase(option.get());
+ futures.erase(future.get());
} else {
watch(); // Try again later assuming empty group.
return;
Modified: incubator/mesos/trunk/src/log/replica.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/log/replica.cpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/log/replica.cpp (original)
+++ incubator/mesos/trunk/src/log/replica.cpp Fri Jan 27 01:25:13 2012
@@ -459,7 +459,7 @@ public:
// Returns all the actions between the specified positions, unless
// those positions are invalid, in which case returns an error.
- process::Promise<std::list<Action> > read(uint64_t from, uint64_t to);
+ process::Future<std::list<Action> > read(uint64_t from, uint64_t to);
// Returns missing positions in the log (i.e., unlearned or holes)
// up to the specified position.
@@ -527,17 +527,17 @@ ReplicaProcess::ReplicaProcess(const str
recover(path);
// Install protobuf handlers.
- installProtobufHandler<PromiseRequest>(
+ install<PromiseRequest>(
&ReplicaProcess::promise);
- installProtobufHandler<WriteRequest>(
+ install<WriteRequest>(
&ReplicaProcess::write);
- installProtobufHandler<LearnedMessage>(
+ install<LearnedMessage>(
&ReplicaProcess::learned,
&LearnedMessage::action);
- installProtobufHandler<LearnRequest>(
+ install<LearnRequest>(
&ReplicaProcess::learn,
&LearnRequest::position);
}
@@ -574,22 +574,22 @@ Result<Action> ReplicaProcess::read(uint
// TODO(benh): Make this function actually return a Try once we change
// the future semantics to not include failures.
-process::Promise<list<Action> > ReplicaProcess::read(
+process::Future<list<Action> > ReplicaProcess::read(
uint64_t from,
uint64_t to)
{
if (to < from) {
process::Promise<list<Action> > promise;
promise.fail("Bad read range (to < from)");
- return promise;
+ return promise.future();
} else if (from < begin) {
process::Promise<list<Action> > promise;
promise.fail("Bad read range (truncated position)");
- return promise;
+ return promise.future();
} else if (end < to) {
process::Promise<list<Action> > promise;
promise.fail("Bad read range (past end of log)");
- return promise;
+ return promise.future();
}
list<Action> actions;
@@ -600,7 +600,7 @@ process::Promise<list<Action> > ReplicaP
if (result.isError()) {
process::Promise<list<Action> > promise;
promise.fail(result.error());
- return promise;
+ return promise.future();
} else if (result.isSome()) {
actions.push_back(result.get());
}
@@ -688,7 +688,7 @@ void ReplicaProcess::promise(const Promi
response.set_okay(true);
response.set_id(request.id());
response.set_position(request.position());
- send(from(), response);
+ reply(response);
}
} else {
CHECK(result.isSome());
@@ -700,7 +700,7 @@ void ReplicaProcess::promise(const Promi
response.set_okay(false);
response.set_id(request.id());
response.set_position(request.position());
- send(from(), response);
+ reply(response);
} else {
Action original = action;
action.set_promised(request.id());
@@ -710,7 +710,7 @@ void ReplicaProcess::promise(const Promi
response.set_okay(true);
response.set_id(request.id());
response.mutable_action()->MergeFrom(original);
- send(from(), response);
+ reply(response);
}
}
}
@@ -722,7 +722,7 @@ void ReplicaProcess::promise(const Promi
PromiseResponse response;
response.set_okay(false);
response.set_id(request.id());
- send(from(), response);
+ reply(response);
} else {
Promise promise;
promise.set_id(request.id());
@@ -735,7 +735,7 @@ void ReplicaProcess::promise(const Promi
response.set_okay(true);
response.set_id(request.id());
response.set_position(end);
- send(from(), response);
+ reply(response);
}
}
}
@@ -757,7 +757,7 @@ void ReplicaProcess::write(const WriteRe
response.set_okay(false);
response.set_id(request.id());
response.set_position(request.position());
- send(from(), response);
+ reply(response);
} else {
Action action;
action.set_position(request.position());
@@ -788,7 +788,7 @@ void ReplicaProcess::write(const WriteRe
response.set_okay(true);
response.set_id(request.id());
response.set_position(request.position());
- send(from(), response);
+ reply(response);
}
}
} else if (result.isSome()) {
@@ -800,7 +800,7 @@ void ReplicaProcess::write(const WriteRe
response.set_okay(false);
response.set_id(request.id());
response.set_position(request.position());
- send(from(), response);
+ reply(response);
} else {
// TODO(benh): Check if this position has already been learned,
// and if so, check that we are re-writing the same value!
@@ -835,7 +835,7 @@ void ReplicaProcess::write(const WriteRe
response.set_okay(true);
response.set_id(request.id());
response.set_position(request.position());
- send(from(), response);
+ reply(response);
}
}
}
@@ -871,11 +871,11 @@ void ReplicaProcess::learn(uint64_t posi
LearnResponse response;
response.set_okay(true);
response.mutable_action()->MergeFrom(result.get());
- send(from(), response);
+ reply(response);
} else {
LearnResponse response;
response.set_okay(false);
- send(from(), response);
+ reply(response);
}
}
Modified: incubator/mesos/trunk/src/master/frameworks_manager.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/frameworks_manager.cpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/frameworks_manager.cpp (original)
+++ incubator/mesos/trunk/src/master/frameworks_manager.cpp Fri Jan 27 01:25:13 2012
@@ -27,7 +27,9 @@
using std::map;
-namespace mesos { namespace internal { namespace master {
+namespace mesos {
+namespace internal {
+namespace master {
// Constructor: Initializes storage and resets cached flag.
FrameworksManager::FrameworksManager(FrameworksStorage* _storage)
@@ -39,8 +41,8 @@ FrameworksManager::FrameworksManager(Fra
Result<map<FrameworkID, FrameworkInfo> > FrameworksManager::list()
{
if (!cache()) {
- return Result<map<FrameworkID,
- FrameworkInfo> >::error("Error caching framework infos.");
+ return Result<map<FrameworkID, FrameworkInfo> >
+ ::error("Error caching framework infos.");
}
map<FrameworkID, FrameworkInfo> result;
@@ -68,23 +70,27 @@ Result<bool> FrameworksManager::add(cons
// return Result<bool>::error("Error caching framework infos.");
// }
//
- // if(infos.count(id)){
+ // if (infos.count(id) > 0) {
// LOG(INFO) << "Duplicate framework detected...id: " << id;
//
- // if(infos[id].first. == info)
+ // if (infos[id].first == info)
// {
// LOG(INFO) << "Duplicate framework information detected...returning.";
// return Result<bool>::some(true);
// }
// }
- Result<bool> result =
- call(storage, &FrameworksStorage::add, id, info);
+ Future<Result<bool> > result =
+ dispatch(storage, &FrameworksStorage::add, id, info);
- if (result.isError()) {
+ result.await();
+
+ CHECK(result.isReady());
+
+ if (result.get().isError()) {
LOG(ERROR) << "Error adding framework to underlying storage: "
- << result.error();
- return result;
+ << result.get().error();
+ return result.get();
}
infos[id] = std::make_pair(info, Option<double>::none());
@@ -93,26 +99,26 @@ Result<bool> FrameworksManager::add(cons
}
-// Remove a framework after a delay.
-// Actually sends a message to self after the given delay.
-Promise<Result<bool> > FrameworksManager::remove(const FrameworkID& id,
- double delay_secs)
+Future<Result<bool> > FrameworksManager::remove(const FrameworkID& id,
+ const seconds& s)
{
if (!cache()) {
return Result<bool>::error("Error caching framework infos.");
}
- if(!infos.count(id)) {
+ if (infos.count(id) == 0) {
LOG(INFO) << "Can't remove non-existent Framework: " << id;
return Result<bool>::error("Error removing non-existing framework.");
}
+ LOG(INFO) << "Expiring framework " << id << " in " << s.value << " seconds";
+
// Set the option to contain the firing time of the message.
- infos[id].second = Option<double>::some(elapsedTime() + delay_secs);
+ infos[id].second = Option<double>::some(Clock::now() + s.value);
- Promise<Result<bool> > promise;
- delay(delay_secs, self(), &FrameworksManager::expire, id, promise);
- return promise;
+ Promise<Result<bool> >* promise = new Promise<Result<bool> >();
+ delay(s.value, self(), &FrameworksManager::expire, id, promise);
+ return promise->future();
}
@@ -125,7 +131,7 @@ Result<bool> FrameworksManager::resurrec
return Result<bool>::error("Error caching framework infos.");
}
- if (infos.count(id)) {
+ if (infos.count(id) > 0) {
infos[id].second = Option<double>::none();
return true;
@@ -149,32 +155,44 @@ Result<bool> FrameworksManager::exists(c
// Actually removes the framework from the underlying storage.
// Checks for the case when the framework is being resurrected.
void FrameworksManager::expire(const FrameworkID& id,
- Promise<Result<bool> > promise)
+ Promise<Result<bool> >* promise)
{
- if (infos.count(id)) {
- Option<double>& option = infos[id].second;
-
- if (option.isSome() && elapsedTime() >= option.get()) {
- LOG(INFO) << "Removing framework " << id << " from storage";
+ if (infos.count(id) > 0) {
+ const Option<double>& option = infos[id].second;
- Result<bool> result = call(storage, &FrameworksStorage::remove, id);
-
- // If storage returns successfully remove from cache.
- if (!result.isError()) {
- infos.erase(id);
+ if (option.isSome()) {
+ if (Clock::now() >= option.get()) {
+ LOG(INFO) << "Removing framework " << id << " from storage";
+
+ Future<Result<bool> > result =
+ dispatch(storage, &FrameworksStorage::remove, id);
+
+ result.await();
+
+ CHECK(result.isReady());
+
+ // If storage returns successfully remove from cache.
+ if (!result.get().isError()) {
+ infos.erase(id);
+ }
+ promise->set(result.get());
+ delete promise;
+ return;
+ } else {
+ LOG(INFO) << "Framework appears to be resurrected, and then "
+ << "re-removed, so ignoring this delayed expire";
}
- promise.set(result);
- return;
} else {
LOG(INFO) << "Framework appears to have been "
- << "resurrected, ignoring delayed expire.";
+ << "resurrected, ignoring delayed expire";
}
} else {
LOG(INFO) << "Framework has already been removed by someone else,"
- << "ignoring delayed expire.";
+ << "ignoring delayed expire";
}
- promise.set(Result<bool>::some(false));
+ promise->set(Result<bool>::some(false));
+ delete promise;
}
@@ -182,16 +200,22 @@ void FrameworksManager::expire(const Fra
bool FrameworksManager::cache()
{
if (!cached) {
- Result<map<FrameworkID, FrameworkInfo> > result =
- call(storage, &FrameworksStorage::list);
+ LOG(INFO) << "Caching framework information";
+
+ Future<Result<map<FrameworkID, FrameworkInfo> > > result =
+ dispatch(storage, &FrameworksStorage::list);
+
+ result.await();
+
+ CHECK(result.isReady());
- if (result.isError()) {
+ if (result.get().isError()) {
LOG(ERROR) << "Error getting framework info from underlying storage: "
- << result.error();
+ << result.get().error();
return false;
}
- foreachpair (const FrameworkID& id, const FrameworkInfo& info, result.get()) {
+ foreachpair (const FrameworkID& id, const FrameworkInfo& info, result.get().get()) {
infos[id] = std::make_pair(info, Option<double>::none());
}
@@ -201,4 +225,6 @@ bool FrameworksManager::cache()
return true;
}
-}}} // namespace mesos { namespace internal { namespace master {
+} // namespace master {
+} // namespace internal {
+} // namespace mesos {
Modified: incubator/mesos/trunk/src/master/frameworks_manager.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/frameworks_manager.hpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/frameworks_manager.hpp (original)
+++ incubator/mesos/trunk/src/master/frameworks_manager.hpp Fri Jan 27 01:25:13 2012
@@ -27,22 +27,25 @@
#include "common/option.hpp"
#include "common/result.hpp"
+#include "common/seconds.hpp"
#include "messages/messages.hpp"
-namespace mesos { namespace internal { namespace master {
+namespace mesos {
+namespace internal {
+namespace master {
using namespace process;
class FrameworksStorage : public Process<FrameworksStorage>
{
public:
- virtual Promise<Result<std::map<FrameworkID, FrameworkInfo> > > list() = 0;
+ virtual Future<Result<std::map<FrameworkID, FrameworkInfo> > > list() = 0;
- virtual Promise<Result<bool> > add(const FrameworkID& id,
+ virtual Future<Result<bool> > add(const FrameworkID& id,
const FrameworkInfo& info) = 0;
- virtual Promise<Result<bool> > remove(const FrameworkID& id) = 0;
+ virtual Future<Result<bool> > remove(const FrameworkID& id) = 0;
};
class FrameworksManager : public Process<FrameworksManager>
@@ -57,8 +60,8 @@ public:
// Add a new framework.
Result<bool> add(const FrameworkID& id, const FrameworkInfo& info);
- // Remove a framework after a certain delay.
- Promise<Result<bool> > remove(const FrameworkID& id, double delay_secs);
+ // Remove a framework after a specified number of seconds.
+ Future<Result<bool> > remove(const FrameworkID& id, const seconds& s);
// Resurrect the framework.
Result<bool> resurrect(const FrameworkID& id);
@@ -68,7 +71,7 @@ public:
Result<bool> exists(const FrameworkID& id);
private:
- void expire(const FrameworkID& id, Promise<Result<bool> > promise);
+ void expire(const FrameworkID& id, Promise<Result<bool> >* promise);
bool cache();
@@ -82,5 +85,8 @@ private:
std::map<FrameworkID, std::pair<FrameworkInfo, Option<double> > > infos;
};
-}}} // namespace mesos { namespace internal { namespace master {
+} // namespace master {
+} // namespace internal {
+} // namespace mesos {
+
#endif // __MASTER_FRAMEWORKS_MANAGER_HPP__
Modified: incubator/mesos/trunk/src/master/http.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/http.cpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/http.cpp (original)
+++ incubator/mesos/trunk/src/master/http.cpp Fri Jan 27 01:25:13 2012
@@ -30,9 +30,9 @@
#include "master/http.hpp"
#include "master/master.hpp"
+using process::Future;
using process::HttpResponse;
using process::HttpRequest;
-using process::Promise;
using std::string;
@@ -151,7 +151,7 @@ JSON::Object model(const Slave& slave)
namespace http {
-Promise<HttpResponse> vars(
+Future<HttpResponse> vars(
const Master& master,
const HttpRequest& request)
{
@@ -184,7 +184,7 @@ Promise<HttpResponse> vars(
namespace json {
-Promise<HttpResponse> stats(
+Future<HttpResponse> stats(
const Master& master,
const HttpRequest& request)
{
@@ -240,7 +240,7 @@ Promise<HttpResponse> stats(
}
-Promise<HttpResponse> state(
+Future<HttpResponse> state(
const Master& master,
const HttpRequest& request)
{
@@ -250,7 +250,7 @@ Promise<HttpResponse> state(
object.values["build_date"] = build::DATE;
object.values["build_user"] = build::USER;
object.values["start_time"] = master.startTime;
- object.values["id"] = master.id;
+ object.values["id"] = master.info.id();
object.values["pid"] = string(master.self());
// Model all of the slaves.
Modified: incubator/mesos/trunk/src/master/http.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/http.hpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/http.hpp (original)
+++ incubator/mesos/trunk/src/master/http.hpp Fri Jan 27 01:25:13 2012
@@ -33,7 +33,7 @@ namespace http {
// Returns current vars in "key value\n" format (keys do not contain
// spaces, values may contain spaces but are ended by a newline).
-process::Promise<process::HttpResponse> vars(
+process::Future<process::HttpResponse> vars(
const Master& master,
const process::HttpRequest& request);
@@ -41,13 +41,13 @@ process::Promise<process::HttpResponse>
namespace json {
// Returns current statistics of the master.
-process::Promise<process::HttpResponse> stats(
+process::Future<process::HttpResponse> stats(
const Master& master,
const process::HttpRequest& request);
// Returns current state of the cluster that the master knows about.
-process::Promise<process::HttpResponse> state(
+process::Future<process::HttpResponse> state(
const Master& master,
const process::HttpRequest& request);
Modified: incubator/mesos/trunk/src/master/master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/master.cpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/master.cpp (original)
+++ incubator/mesos/trunk/src/master/master.cpp Fri Jan 27 01:25:13 2012
@@ -60,58 +60,56 @@ public:
SlaveObserver(const UPID& _slave,
const SlaveInfo& _slaveInfo,
const SlaveID& _slaveId,
- const PID<SlavesManager>& _slavesManager)
+ const PID<Master>& _master)
: slave(_slave),
slaveInfo(_slaveInfo),
slaveId(_slaveId),
- slavesManager(_slavesManager),
+ master(_master),
timeouts(0),
- pinged(false) {}
-
- virtual ~SlaveObserver() {}
+ pinged(false)
+ {
+ install("PONG", &SlaveObserver::pong);
+ }
protected:
- virtual void operator () ()
+ virtual void initialize()
{
- // Send a ping some interval after we heard the last pong. Or if
- // we don't hear a pong, increment the number of timeouts from the
- // slave and try and send another ping. If we eventually timeout too
- // many missed pongs in a row, consider the slave dead.
send(slave, "PING");
pinged = true;
+ delay(SLAVE_PONG_TIMEOUT, self(), &SlaveObserver::timeout);
+ }
- do {
- receive(SLAVE_PONG_TIMEOUT);
- if (name() == "PONG") {
- timeouts = 0;
- pinged = false;
- } else if (name() == TIMEOUT) {
- if (pinged) {
- timeouts++;
- pinged = false;
- }
+ void pong(const UPID& from, const string& body)
+ {
+ timeouts = 0;
+ pinged = false;
+ }
- send(slave, "PING");
- pinged = true;
- } else if (name() == TERMINATE) {
+ void timeout()
+ {
+ if (pinged) { // So we haven't got back a pong yet ...
+ if (++timeouts >= MAX_SLAVE_TIMEOUTS) {
+ deactivate();
return;
}
- } while (timeouts < MAX_SLAVE_TIMEOUTS);
-
- // Tell the slave manager to deactivate the slave, this will take
- // care of updating the master too.
- while (!call(slavesManager, &SlavesManager::deactivate,
- slaveInfo.hostname(), slave.port)) {
- LOG(WARNING) << "Slave \"failed\" but can't be deactivated, retrying";
- pause(5);
}
+
+ send(slave, "PING");
+ pinged = true;
+ delay(SLAVE_PONG_TIMEOUT, self(), &SlaveObserver::timeout);
+ }
+
+ void deactivate()
+ {
+ dispatch(master, &Master::deactivatedSlaveHostnamePort,
+ slaveInfo.hostname(), slave.port);
}
private:
const UPID slave;
const SlaveInfo slaveInfo;
const SlaveID slaveId;
- const PID<SlavesManager> slavesManager;
+ const PID<Master> master;
int timeouts;
bool pinged;
};
@@ -134,10 +132,15 @@ struct SlaveRegistrar
const PID<Master>& master,
const PID<SlavesManager>& slavesManager)
{
- if (!call(slavesManager, &SlavesManager::add,
- slave->info.hostname(), slave->pid.port)) {
+ Future<bool> added = dispatch(slavesManager, &SlavesManager::add,
+ slave->info.hostname(), slave->pid.port);
+ added.await();
+ if (!added.isReady() || !added.get()) {
LOG(WARNING) << "Could not register slave because failed"
<< " to add it to the slaves maanger";
+ // TODO(benh): This could be because our acknowledgement to the
+ // slave was dropped, so they retried, and now we should
+ // probably send another acknowledgement.
delete slave;
return false;
}
@@ -166,10 +169,15 @@ struct SlaveReregistrar
const PID<Master>& master,
const PID<SlavesManager>& slavesManager)
{
- if (!call(slavesManager, &SlavesManager::add,
- slave->info.hostname(), slave->pid.port)) {
+ Future<bool> added = dispatch(slavesManager, &SlavesManager::add,
+ slave->info.hostname(), slave->pid.port);
+ added.await();
+ if (!added.isReady() || !added.get()) {
LOG(WARNING) << "Could not register slave because failed"
<< " to add it to the slaves maanger";
+ // TODO(benh): This could be because our acknowledgement to the
+ // slave was dropped, so they retried, and now we should
+ // probably send another acknowledgement.
delete slave;
return false;
}
@@ -182,18 +190,14 @@ struct SlaveReregistrar
Master::Master(Allocator* _allocator)
: ProcessBase("master"),
allocator(_allocator)
-{
- initialize();
-}
+{}
Master::Master(Allocator* _allocator, const Configuration& conf)
: ProcessBase("master"),
allocator(_allocator),
conf(conf)
-{
- initialize();
-}
+{}
Master::~Master()
@@ -259,24 +263,24 @@ vector<Slave*> Master::getActiveSlaves()
}
-void Master::operator () ()
+void Master::initialize()
{
LOG(INFO) << "Master started at mesos://" << self();
- // Don't do anything until we get a master token.
- while (receive() != GotMasterTokenMessage().GetTypeName()) {
- LOG(INFO) << "Oops! We're dropping a message since "
- << "we haven't received an identifier yet!";
- }
+ // The master ID is currently comprised of the current date, the IP
+ // address and port from self() and the OS PID.
+
+ Try<string> id =
+ strings::format("%s%d-%d-%d", DateUtils::currentDate().c_str(),
+ self().ip, self().port, getpid());
- GotMasterTokenMessage message;
- message.ParseFromString(body());
+ CHECK(!id.isError()) << id.error();
- // The master ID is comprised of the current date and some ephemeral
- // token (e.g., determined by ZooKeeper).
+ info.set_id(id.get());
+ info.set_ip(self().ip);
+ info.set_port(self().port);
- id = DateUtils::currentDate() + "-" + message.token();
- LOG(INFO) << "Master ID: " << id;
+ LOG(INFO) << "Master ID: " << info.id();
// Setup slave manager.
slavesManager = new SlavesManager(conf, self());
@@ -284,27 +288,6 @@ void Master::operator () ()
allocator->initialize(this);
- // Start our timer ticks.
- delay(1.0, self(), &Master::timerTick);
-
- while (true) {
- serve();
- if (name() == TERMINATE) {
- LOG(INFO) << "Asked to terminate by " << from();
- foreachvalue (Slave* slave, slaves) {
- send(slave->pid, TERMINATE);
- }
- break;
- } else {
- LOG(WARNING) << "Dropping unknown message '" << name() << "'"
- << " from: " << from();
- }
- }
-}
-
-
-void Master::initialize()
-{
elected = false;
nextFrameworkId = 0;
@@ -327,115 +310,153 @@ void Master::initialize()
stats.validFrameworkMessages = 0;
stats.invalidFrameworkMessages = 0;
- startTime = elapsedTime();
+ startTime = Clock::now();
+
+ // Start our timer ticks.
+ delay(1.0, self(), &Master::timerTick);
// Install handler functions for certain messages.
- installProtobufHandler<SubmitSchedulerRequest>(
+ install<SubmitSchedulerRequest>(
&Master::submitScheduler,
&SubmitSchedulerRequest::name);
- installProtobufHandler<NewMasterDetectedMessage>(
+ install<NewMasterDetectedMessage>(
&Master::newMasterDetected,
&NewMasterDetectedMessage::pid);
- installProtobufHandler<NoMasterDetectedMessage>(
+ install<NoMasterDetectedMessage>(
&Master::noMasterDetected);
- installProtobufHandler<RegisterFrameworkMessage>(
+ install<RegisterFrameworkMessage>(
&Master::registerFramework,
&RegisterFrameworkMessage::framework);
- installProtobufHandler<ReregisterFrameworkMessage>(
+ install<ReregisterFrameworkMessage>(
&Master::reregisterFramework,
&ReregisterFrameworkMessage::framework_id,
&ReregisterFrameworkMessage::framework,
&ReregisterFrameworkMessage::failover);
- installProtobufHandler<UnregisterFrameworkMessage>(
+ install<UnregisterFrameworkMessage>(
&Master::unregisterFramework,
&UnregisterFrameworkMessage::framework_id);
- installProtobufHandler<DeactivateFrameworkMessage>(
+ install<DeactivateFrameworkMessage>(
&Master::deactivateFramework,
&DeactivateFrameworkMessage::framework_id);
- installProtobufHandler<ResourceRequestMessage>(
+ install<ResourceRequestMessage>(
&Master::resourceRequest,
&ResourceRequestMessage::framework_id,
&ResourceRequestMessage::requests);
- installProtobufHandler<LaunchTasksMessage>(
+ install<LaunchTasksMessage>(
&Master::launchTasks,
&LaunchTasksMessage::framework_id,
&LaunchTasksMessage::offer_id,
&LaunchTasksMessage::tasks,
&LaunchTasksMessage::filters);
- installProtobufHandler<ReviveOffersMessage>(
+ install<ReviveOffersMessage>(
&Master::reviveOffers,
&ReviveOffersMessage::framework_id);
- installProtobufHandler<KillTaskMessage>(
+ install<KillTaskMessage>(
&Master::killTask,
&KillTaskMessage::framework_id,
&KillTaskMessage::task_id);
- installProtobufHandler<FrameworkToExecutorMessage>(
+ install<FrameworkToExecutorMessage>(
&Master::schedulerMessage,
&FrameworkToExecutorMessage::slave_id,
&FrameworkToExecutorMessage::framework_id,
&FrameworkToExecutorMessage::executor_id,
&FrameworkToExecutorMessage::data);
- installProtobufHandler<RegisterSlaveMessage>(
+ install<RegisterSlaveMessage>(
&Master::registerSlave,
&RegisterSlaveMessage::slave);
- installProtobufHandler<ReregisterSlaveMessage>(
+ install<ReregisterSlaveMessage>(
&Master::reregisterSlave,
&ReregisterSlaveMessage::slave_id,
&ReregisterSlaveMessage::slave,
&ReregisterSlaveMessage::executor_infos,
&ReregisterSlaveMessage::tasks);
- installProtobufHandler<UnregisterSlaveMessage>(
+ install<UnregisterSlaveMessage>(
&Master::unregisterSlave,
&UnregisterSlaveMessage::slave_id);
- installProtobufHandler<StatusUpdateMessage>(
+ install<StatusUpdateMessage>(
&Master::statusUpdate,
&StatusUpdateMessage::update,
&StatusUpdateMessage::pid);
- installProtobufHandler<ExecutorToFrameworkMessage>(
+ install<ExecutorToFrameworkMessage>(
&Master::executorMessage,
&ExecutorToFrameworkMessage::slave_id,
&ExecutorToFrameworkMessage::framework_id,
&ExecutorToFrameworkMessage::executor_id,
&ExecutorToFrameworkMessage::data);
- installProtobufHandler<ExitedExecutorMessage>(
+ install<ExitedExecutorMessage>(
&Master::exitedExecutor,
&ExitedExecutorMessage::slave_id,
&ExitedExecutorMessage::framework_id,
&ExitedExecutorMessage::executor_id,
&ExitedExecutorMessage::status);
- // Install some message handlers.
- installMessageHandler(EXITED, &Master::exited);
+ // Setup HTTP request handlers.
+ route("vars", bind(&http::vars, cref(*this), params::_1));
+ route("stats.json", bind(&http::json::stats, cref(*this), params::_1));
+ route("state.json", bind(&http::json::state, cref(*this), params::_1));
+}
+
- // Install HTTP request handlers.
- installHttpHandler(
- "vars",
- bind(&http::vars, cref(*this), params::_1));
-
- installHttpHandler(
- "stats.json",
- bind(&http::json::stats, cref(*this), params::_1));
-
- installHttpHandler(
- "state.json",
- bind(&http::json::state, cref(*this), params::_1));
+void Master::finalize()
+{
+ LOG(INFO) << "Master terminating";
+ foreachvalue (Slave* slave, slaves) {
+ send(slave->pid, ShutdownMessage());
+ }
+}
+
+
+void Master::exited(const UPID& pid)
+{
+ foreachvalue (Framework* framework, frameworks) {
+ if (framework->pid == pid) {
+ LOG(INFO) << "Framework " << framework->id << " disconnected";
+
+// removeFramework(framework);
+
+ // Stop sending offers here for now.
+ framework->active = false;
+
+ // Delay dispatching a message to ourselves for the timeout.
+ delay(failoverTimeout, self(),
+ &Master::frameworkFailoverTimeout,
+ framework->id, framework->reregisteredTime);
+
+ // Remove the framework's offers.
+ foreach (Offer* offer, utils::copy(framework->offers)) {
+ allocator->resourcesRecovered(offer->framework_id(),
+ offer->slave_id(),
+ offer->resources());
+ removeOffer(offer);
+ }
+ return;
+ }
+ }
+
+ foreachvalue (Slave* slave, slaves) {
+ if (slave->pid == pid) {
+ LOG(INFO) << "Slave " << slave->id << " disconnected";
+ removeSlave(slave);
+ return;
+ }
+ }
}
@@ -444,7 +465,7 @@ void Master::submitScheduler(const strin
LOG(INFO) << "Scheduler submit request for " << name;
SubmitSchedulerResponse response;
response.set_okay(false);
- send(from(), response);
+ reply(response);
}
@@ -487,16 +508,16 @@ void Master::registerFramework(const Fra
}
Framework* framework =
- new Framework(frameworkInfo, newFrameworkId(), from(), elapsedTime());
+ new Framework(frameworkInfo, newFrameworkId(), from, Clock::now());
- LOG(INFO) << "Registering framework " << framework->id << " at " << from();
+ LOG(INFO) << "Registering framework " << framework->id << " at " << from;
if (framework->info.executor().uri() == "") {
LOG(INFO) << framework << " registering without an executor URI";
FrameworkErrorMessage message;
message.set_code(1);
message.set_message("No executor URI given");
- send(from(), message);
+ reply(message);
delete framework;
return;
}
@@ -509,7 +530,7 @@ void Master::registerFramework(const Fra
FrameworkErrorMessage message;
message.set_code(1);
message.set_message("User 'root' is not allowed to run frameworks");
- send(from(), message);
+ reply(message);
delete framework;
return;
}
@@ -533,7 +554,7 @@ void Master::reregisterFramework(const F
FrameworkErrorMessage message;
message.set_code(1);
message.set_message("Missing framework id");
- send(from(), message);
+ reply(message);
return;
}
@@ -543,12 +564,12 @@ void Master::reregisterFramework(const F
FrameworkErrorMessage message;
message.set_code(1);
message.set_message("No executor URI given");
- send(from(), message);
+ reply(message);
return;
}
LOG(INFO) << "Re-registering framework " << frameworkId
- << " at " << from();
+ << " at " << from;
if (frameworks.count(frameworkId) > 0) {
// Using the "failover" of the scheduler allows us to keep a
@@ -567,7 +588,7 @@ void Master::reregisterFramework(const F
// TODO: Should we check whether the new scheduler has given
// us a different framework name, user name or executor info?
LOG(INFO) << "Framework " << frameworkId << " failed over";
- failoverFramework(framework, from());
+ failoverFramework(framework, from);
} else {
LOG(INFO) << "Allowing the Framework " << frameworkId
<< " to re-register with an already used id";
@@ -585,7 +606,7 @@ void Master::reregisterFramework(const F
FrameworkReregisteredMessage message;
message.mutable_framework_id()->MergeFrom(frameworkId);
- send(from(), message);
+ reply(message);
return;
}
} else {
@@ -594,7 +615,7 @@ void Master::reregisterFramework(const F
// failed-over one is connecting. Create a Framework object and add
// any tasks it has that have been reported by reconnecting slaves.
Framework* framework =
- new Framework(frameworkInfo, frameworkId, from(), elapsedTime());
+ new Framework(frameworkInfo, frameworkId, from, Clock::now());
// TODO(benh): Check for root submissions like above!
@@ -626,7 +647,7 @@ void Master::reregisterFramework(const F
foreachvalue (Slave* slave, slaves) {
UpdateFrameworkMessage message;
message.mutable_framework_id()->MergeFrom(frameworkId);
- message.set_pid(from());
+ message.set_pid(from);
send(slave->pid, message);
}
}
@@ -638,10 +659,10 @@ void Master::unregisterFramework(const F
Framework* framework = getFramework(frameworkId);
if (framework != NULL) {
- if (framework->pid == from()) {
+ if (framework->pid == from) {
removeFramework(framework);
} else {
- LOG(WARNING) << from() << " tried to unregister framework; "
+ LOG(WARNING) << from << " tried to unregister framework; "
<< "expecting " << framework->pid;
}
}
@@ -654,10 +675,10 @@ void Master::deactivateFramework(const F
Framework* framework = getFramework(frameworkId);
if (framework != NULL) {
- if (framework->pid == from()) {
+ if (framework->pid == from) {
framework->active = false;
} else {
- LOG(WARNING) << from() << " tried to deactivate framework; "
+ LOG(WARNING) << from << " tried to deactivate framework; "
<< "expecting " << framework->pid;
}
}
@@ -696,6 +717,7 @@ void Master::launchTasks(const Framework
// to same offer, etc). Report all tasks in it as failed.
// TODO: Consider adding a new task state TASK_INVALID for
// situations like these.
+ LOG(WARNING) << "Offer " << offerId << " is no longer valid";
foreach (const TaskDescription& task, tasks) {
StatusUpdateMessage message;
StatusUpdate* update = message.mutable_update();
@@ -704,7 +726,7 @@ void Master::launchTasks(const Framework
status->mutable_task_id()->MergeFrom(task.task_id());
status->set_state(TASK_LOST);
status->set_message("Task launched with invalid offer");
- update->set_timestamp(elapsedTime());
+ update->set_timestamp(Clock::now());
update->set_uuid(UUID::random().toBytes());
send(framework->pid, message);
}
@@ -762,7 +784,7 @@ void Master::killTask(const FrameworkID&
status->mutable_task_id()->MergeFrom(taskId);
status->set_state(TASK_LOST);
status->set_message("Task not found");
- update->set_timestamp(elapsedTime());
+ update->set_timestamp(Clock::now());
update->set_uuid(UUID::random().toBytes());
send(framework->pid, message);
}
@@ -812,22 +834,39 @@ void Master::registerSlave(const SlaveIn
return;
}
- Slave* slave = new Slave(slaveInfo, newSlaveId(), from(), elapsedTime());
+ // Check if this slave has already registered (because it's retrying).
+ foreachvalue (Slave* slave, slaves) {
+ if (slave->pid == from) {
+ LOG(INFO) << "Slave " << slave->id << " (" << slave->info.hostname()
+ << ") already registered, re-sending acknowledgement";
+ SlaveRegisteredMessage message;
+ message.mutable_slave_id()->MergeFrom(slave->id);
+ send(slave->pid, message);
+ return;
+ }
+ }
+
+ Slave* slave = new Slave(slaveInfo, newSlaveId(), from, Clock::now());
LOG(INFO) << "Attempting to register slave " << slave->id
<< " at " << slave->pid;
- // Checks if this slave, or if all slaves, can be accepted.
- if (slaveHostnamePorts.contains(slaveInfo.hostname(), from().port)) {
- run(&SlaveRegistrar::run, slave, self());
- } else if (conf.get<string>("slaves", "*") == "*") {
- run(&SlaveRegistrar::run, slave, self(), slavesManager->self());
- } else {
- LOG(WARNING) << "Cannot register slave at "
- << slaveInfo.hostname() << ":" << from().port
- << " because not in allocated set of slaves!";
- send(from(), TERMINATE);
- }
+ // TODO(benh): We assume all slaves can register for now.
+ CHECK(conf.get<string>("slaves", "*") == "*");
+ activatedSlaveHostnamePort(slave->info.hostname(), slave->pid.port);
+ addSlave(slave);
+
+// // Checks if this slave, or if all slaves, can be accepted.
+// if (slaveHostnamePorts.contains(slaveInfo.hostname(), from.port)) {
+// run(&SlaveRegistrar::run, slave, self());
+// } else if (conf.get<string>("slaves", "*") == "*") {
+// run(&SlaveRegistrar::run, slave, self(), slavesManager->self());
+// } else {
+// LOG(WARNING) << "Cannot register slave at "
+// << slaveInfo.hostname() << ":" << from.port
+// << " because not in allocated set of slaves!";
+// reply(ShutdownMessage());
+// }
}
@@ -843,18 +882,18 @@ void Master::reregisterSlave(const Slave
if (slaveId == "") {
LOG(ERROR) << "Slave re-registered without an id!";
- send(from(), TERMINATE);
+ reply(ShutdownMessage());
} else {
Slave* slave = getSlave(slaveId);
if (slave != NULL) {
- // NOTE: This handles the case where a slave tries to re-register with an
- // existing master (e.g. because of a spurious zookeeper session
- // expiration.)
- // For now, we assume this slave is not nefarious (eventually this will
- // be handled by orthogonal security measures like key based
- // authentication).
+ // NOTE: This handles the case where a slave tries to
+ // re-register with an existing master (e.g. because of a
+ // spurious zookeeper session expiration).
+ // For now, we assume this slave is not nefarious (eventually
+ // this will be handled by orthogonal security measures like key
+ // based authentication).
- LOG(WARNING) << "Slave at " << from()
+ LOG(WARNING) << "Slave at " << from
<< " is being allowed to re-register with an already"
<< " in use id (" << slaveId << ")";
@@ -863,23 +902,28 @@ void Master::reregisterSlave(const Slave
send(slave->pid, message);
} else {
- Slave* slave = new Slave(slaveInfo, slaveId, from(), elapsedTime());
+ Slave* slave = new Slave(slaveInfo, slaveId, from, Clock::now());
LOG(INFO) << "Attempting to re-register slave " << slave->id
<< " at " << slave->pid;
- // Checks if this slave, or if all slaves, can be accepted.
- if (slaveHostnamePorts.contains(slaveInfo.hostname(), from().port)) {
- run(&SlaveReregistrar::run, slave, executorInfos, tasks, self());
- } else if (conf.get<string>("slaves", "*") == "*") {
- run(&SlaveReregistrar::run,
- slave, executorInfos, tasks, self(), slavesManager->self());
- } else {
- LOG(WARNING) << "Cannot re-register slave at "
- << slaveInfo.hostname() << ":" << from().port
- << " because not in allocated set of slaves!";
- send(from(), TERMINATE);
- }
+ // TODO(benh): We assume all slaves can register for now.
+ CHECK(conf.get<string>("slaves", "*") == "*");
+ activatedSlaveHostnamePort(slave->info.hostname(), slave->pid.port);
+ readdSlave(slave, executorInfos, tasks);
+
+// // Checks if this slave, or if all slaves, can be accepted.
+// if (slaveHostnamePorts.contains(slaveInfo.hostname(), from.port)) {
+// run(&SlaveReregistrar::run, slave, executorInfos, tasks, self());
+// } else if (conf.get<string>("slaves", "*") == "*") {
+// run(&SlaveReregistrar::run,
+// slave, executorInfos, tasks, self(), slavesManager->self());
+// } else {
+// LOG(WARNING) << "Cannot re-register slave at "
+// << slaveInfo.hostname() << ":" << from.port
+// << " because not in allocated set of slaves!";
+// reply(ShutdownMessage());
+// }
}
}
}
@@ -902,7 +946,7 @@ void Master::statusUpdate(const StatusUp
{
const TaskStatus& status = update.status();
- LOG(INFO) << "Status update from " << from()
+ LOG(INFO) << "Status update from " << from
<< ": task " << status.task_id()
<< " of framework " << update.framework_id()
<< " is now in state " << status.state();
@@ -934,19 +978,19 @@ void Master::statusUpdate(const StatusUp
stats.validStatusUpdates++;
} else {
- LOG(WARNING) << "Status update from " << from()
+ LOG(WARNING) << "Status update from " << from
<< ": error, couldn't lookup "
<< "task " << status.task_id();
stats.invalidStatusUpdates++;
}
} else {
- LOG(WARNING) << "Status update from " << from()
+ LOG(WARNING) << "Status update from " << from
<< ": error, couldn't lookup "
<< "framework " << update.framework_id();
stats.invalidStatusUpdates++;
}
} else {
- LOG(WARNING) << "Status update from " << from()
+ LOG(WARNING) << "Status update from " << from
<< ": error, couldn't lookup slave "
<< update.slave_id();
stats.invalidStatusUpdates++;
@@ -1030,7 +1074,7 @@ void Master::exitedExecutor(const SlaveI
status->mutable_task_id()->MergeFrom(task->task_id());
status->set_state(TASK_LOST);
status->set_message("Lost executor");
- update->set_timestamp(elapsedTime());
+ update->set_timestamp(Clock::now());
update->set_uuid(UUID::random().toBytes());
send(framework->pid, message);
@@ -1074,7 +1118,7 @@ void Master::deactivatedSlaveHostnamePor
LOG(WARNING) << "Removing slave " << slave->id << " at "
<< hostname << ":" << port
<< " because it has been deactivated";
- send(slave->pid, TERMINATE);
+ send(slave->pid, ShutdownMessage());
removeSlave(slave);
break;
}
@@ -1091,7 +1135,7 @@ void Master::timerTick()
{
// Check which framework filters can be expired.
foreachvalue (Framework* framework, frameworks) {
- framework->removeExpiredFilters(elapsedTime());
+ framework->removeExpiredFilters(Clock::now());
}
// Do allocations!
@@ -1115,43 +1159,6 @@ void Master::frameworkFailoverTimeout(co
}
-void Master::exited()
-{
- foreachvalue (Framework* framework, frameworks) {
- if (framework->pid == from()) {
- LOG(INFO) << "Framework " << framework->id << " disconnected";
-
-// removeFramework(framework);
-
- // Stop sending offers here for now.
- framework->active = false;
-
- // Delay dispatching a message to ourselves for the timeout.
- delay(failoverTimeout, self(),
- &Master::frameworkFailoverTimeout,
- framework->id, framework->reregisteredTime);
-
- // Remove the framework's offers.
- foreach (Offer* offer, utils::copy(framework->offers)) {
- allocator->resourcesRecovered(offer->framework_id(),
- offer->slave_id(),
- offer->resources());
- removeOffer(offer);
- }
- return;
- }
- }
-
- foreachvalue (Slave* slave, slaves) {
- if (slave->pid == from()) {
- LOG(INFO) << "Slave " << slave->id << " disconnected";
- removeSlave(slave);
- return;
- }
- }
-}
-
-
void Master::makeOffers(Framework* framework,
const hashmap<Slave*, Resources>& offered)
{
@@ -1368,6 +1375,7 @@ void Master::processTasks(Offer* offer,
usedResources += launchTask(task, framework, slave);
} else {
// Error validating task, send a failed status update.
+ LOG(WARNING) << "Error validating task: " << error.get();
StatusUpdateMessage message;
StatusUpdate* update = message.mutable_update();
update->mutable_framework_id()->MergeFrom(framework->id);
@@ -1375,7 +1383,7 @@ void Master::processTasks(Offer* offer,
status->mutable_task_id()->MergeFrom(task.task_id());
status->set_state(TASK_LOST);
status->set_message(error.get());
- update->set_timestamp(elapsedTime());
+ update->set_timestamp(Clock::now());
update->set_uuid(UUID::random().toBytes());
send(framework->pid, message);
}
@@ -1414,7 +1422,7 @@ void Master::processTasks(Offer* offer,
<< " for framework " << framework->id
<< " for " << timeout << " seconds";
framework->slaveFilter[slave] =
- (timeout == -1) ? 0 : elapsedTime() + timeout;
+ (timeout == -1) ? 0 : Clock::now() + timeout;
}
removeOffer(offer);
@@ -1518,7 +1526,7 @@ void Master::failoverFramework(Framework
// Make sure we can get offers again.
framework->active = true;
- framework->reregisteredTime = elapsedTime();
+ framework->reregisteredTime = Clock::now();
{
FrameworkRegisteredMessage message;
@@ -1582,7 +1590,7 @@ void Master::removeFramework(Framework*
// TODO(benh): unlink(framework->pid);
- framework->unregisteredTime = elapsedTime();
+ framework->unregisteredTime = Clock::now();
completedFrameworks.push_back(*framework);
@@ -1626,7 +1634,7 @@ void Master::addSlave(Slave* slave, bool
// Set up an observer for the slave.
slave->observer = new SlaveObserver(slave->pid, slave->info,
- slave->id, slavesManager->self());
+ slave->id, self());
spawn(slave->observer);
allocator->slaveAdded(slave);
@@ -1717,7 +1725,7 @@ void Master::removeSlave(Slave* slave)
status->mutable_task_id()->MergeFrom(task->task_id());
status->set_state(TASK_LOST);
status->set_message("Slave removed");
- update->set_timestamp(elapsedTime());
+ update->set_timestamp(Clock::now());
update->set_uuid(UUID::random().toBytes());
send(framework->pid, message);
}
@@ -1853,7 +1861,7 @@ FrameworkID Master::newFrameworkId()
{
std::ostringstream out;
- out << id << "-" << std::setw(4)
+ out << info.id() << "-" << std::setw(4)
<< std::setfill('0') << nextFrameworkId++;
FrameworkID frameworkId;
@@ -1866,7 +1874,7 @@ FrameworkID Master::newFrameworkId()
OfferID Master::newOfferId()
{
OfferID offerId;
- offerId.set_value(id + "-" + utils::stringify(nextOfferId++));
+ offerId.set_value(info.id() + "-" + utils::stringify(nextOfferId++));
return offerId;
}
@@ -1874,7 +1882,7 @@ OfferID Master::newOfferId()
SlaveID Master::newSlaveId()
{
SlaveID slaveId;
- slaveId.set_value(id + "-" + utils::stringify(nextSlaveId++));
+ slaveId.set_value(info.id() + "-" + utils::stringify(nextSlaveId++));
return slaveId;
}
Modified: incubator/mesos/trunk/src/master/master.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/master.hpp?rev=1236485&r1=1236484&r2=1236485&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/master.hpp (original)
+++ incubator/mesos/trunk/src/master/master.hpp Fri Jan 27 01:25:13 2012
@@ -108,7 +108,6 @@ public:
void timerTick();
void frameworkFailoverTimeout(const FrameworkID& frameworkId,
double reregisteredTime);
- void exited();
// Return connected frameworks that are not in the process of being removed
std::vector<Framework*> getActiveFrameworks() const;
@@ -120,9 +119,9 @@ public:
const hashmap<Slave*, Resources>& offered);
protected:
- virtual void operator () ();
-
- void initialize();
+ virtual void initialize();
+ virtual void finalize();
+ virtual void exited(const UPID& pid);
// Process a launch tasks request (for a non-cancelled offer) by
// launching the desired tasks (if the offer contains a valid set of
@@ -184,15 +183,15 @@ private:
// Http handlers, friends of the master in order to access state,
// they get invoked from within the master so there is no need to
// use synchronization mechanisms to protect state.
- friend Promise<HttpResponse> http::vars(
+ friend Future<HttpResponse> http::vars(
const Master& master,
const HttpRequest& request);
- friend Promise<HttpResponse> http::json::stats(
+ friend Future<HttpResponse> http::json::stats(
const Master& master,
const HttpRequest& request);
- friend Promise<HttpResponse> http::json::state(
+ friend Future<HttpResponse> http::json::state(
const Master& master,
const HttpRequest& request);
@@ -203,10 +202,7 @@ private:
Allocator* allocator;
SlavesManager* slavesManager;
- // Contains the date the master was launched and some ephemeral
- // token (e.g. returned from ZooKeeper). Used in framework and slave
- // IDs created by this master.
- std::string id;
+ MasterInfo info;
multihashmap<std::string, uint16_t> slaveHostnamePorts;