You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/27 08:08:39 UTC

svn commit: r1140024 [6/15] - in /incubator/mesos/trunk: ./ ec2/ ec2/deploy.karmic64/ ec2/deploy.solaris/ frameworks/torque/nexus-hpl/ include/mesos/ src/ src/common/ src/configurator/ src/detector/ src/examples/ src/examples/java/ src/examples/python/...

Modified: incubator/mesos/trunk/src/slave/slave.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.hpp?rev=1140024&r1=1140023&r2=1140024&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.hpp (original)
+++ incubator/mesos/trunk/src/slave/slave.hpp Mon Jun 27 06:08:33 2011
@@ -1,308 +1,167 @@
 #ifndef __SLAVE_HPP__
 #define __SLAVE_HPP__
 
-#include <dirent.h>
-#include <libgen.h>
-#include <netdb.h>
-#include <pwd.h>
-#include <signal.h>
-#include <stdio.h>
-#include <strings.h>
-
-#include <arpa/inet.h>
-
-#include <glog/logging.h>
-
-#include <sys/stat.h>
-#include <sys/types.h>
-#include <sys/wait.h>
-
-#include <iostream>
-#include <list>
-#include <sstream>
-#include <set>
-#include <vector>
-
-#include <boost/lexical_cast.hpp>
-#include <boost/unordered_set.hpp>
-#include <boost/unordered_map.hpp>
-
 #include <process/process.hpp>
+#include <process/protobuf.hpp>
 
 #include "isolation_module.hpp"
 #include "state.hpp"
 
-#include "common/build.hpp"
-#include "common/fatal.hpp"
-#include "common/foreach.hpp"
 #include "common/resources.hpp"
-#include "common/type_utils.hpp"
+#include "common/hashmap.hpp"
+#include "common/uuid.hpp"
 
 #include "configurator/configurator.hpp"
 
-#include "messaging/messages.hpp"
+#include "messages/messages.hpp"
 
 
 namespace mesos { namespace internal { namespace slave {
 
-using foreach::_;
-
-
-const double STATUS_UPDATE_RETRY_TIMEOUT = 10;
-
-
-// Information describing an executor (goes away if executor crashes).
-struct Executor
-{
-  Executor(const FrameworkID& _frameworkId, const ExecutorInfo& _info)
-    : frameworkId(_frameworkId), info(_info), pid(process::UPID()) {}
-
-  ~Executor()
-  {
-    // Delete the tasks.
-    foreachpair (_, Task *task, tasks) {
-      delete task;
-    }
-  }
-
-  Task* addTask(const TaskDescription& task)
-  {
-    // The master should enforce unique task IDs, but just in case
-    // maybe we shouldn't make this a fatal error.
-    CHECK(tasks.count(task.task_id()) == 0);
-
-    Task *t = new Task();
-    t->mutable_framework_id()->MergeFrom(frameworkId);
-    t->mutable_executor_id()->MergeFrom(info.executor_id());
-    t->set_state(TASK_STARTING);
-    t->set_name(task.name());
-    t->mutable_task_id()->MergeFrom(task.task_id());
-    t->mutable_slave_id()->MergeFrom(task.slave_id());
-    t->mutable_resources()->MergeFrom(task.resources());
-
-    tasks[task.task_id()] = t;
-    resources += task.resources();
-  }
-
-  void removeTask(const TaskID& taskId)
-  {
-    // Remove task from the queue if it's queued
-    for (std::list<TaskDescription>::iterator it = queuedTasks.begin();
-	 it != queuedTasks.end(); ++it) {
-      if ((*it).task_id() == taskId) {
-	queuedTasks.erase(it);
-	break;
-      }
-    }
-
-    // Remove it from tasks as well.
-    if (tasks.count(taskId) > 0) {
-      Task* task = tasks[taskId];
-      foreach (const Resource& resource, task->resources()) {
-        resources -= resource;
-      }
-      tasks.erase(taskId);
-      delete task;
-    }
-  }
-
-  void updateTaskState(const TaskID& taskId, TaskState state)
-  {
-    if (tasks.count(taskId) > 0) {
-      tasks[taskId]->set_state(state);
-    }
-  }
-
-  const FrameworkID frameworkId;
-  const ExecutorInfo info;
-
-  process::UPID pid;
+using namespace process;
 
-  std::list<TaskDescription> queuedTasks;
-  boost::unordered_map<TaskID, Task*> tasks;
+struct Framework;
+struct Executor;
 
-  Resources resources;
+// TODO(benh): Also make configuration options be constants.
 
-  // Information about the status of the executor for this framework, set by
-  // the isolation module. For example, this might include a PID, a VM ID, etc.
-  std::string executorStatus;
-};
+const double EXECUTOR_SHUTDOWN_TIMEOUT_SECONDS = 5.0;
+const double STATUS_UPDATE_RETRY_INTERVAL_SECONDS = 10.0;
 
 
-// Information about a framework.
-struct Framework
-{
-  Framework(const FrameworkID& _frameworkId, const FrameworkInfo& _info,
-            const process::UPID& _pid)
-    : frameworkId(_frameworkId), info(_info), pid(_pid) {}
-
-  ~Framework() {}
-
-  Executor* createExecutor(const ExecutorInfo& info)
-  {
-    Executor* executor = new Executor(frameworkId, info);
-    CHECK(executors.count(info.executor_id()) == 0);
-    executors[info.executor_id()] = executor;
-    return executor;
-  }
-
-  void destroyExecutor(const ExecutorID& executorId)
-  {
-    if (executors.count(executorId) > 0) {
-      Executor* executor = executors[executorId];
-      executors.erase(executorId);
-      delete executor;
-    }
-  }
-
-  Executor* getExecutor(const ExecutorID& executorId)
-  {
-    if (executors.count(executorId) > 0) {
-      return executors[executorId];
-    }
-
-    return NULL;
-  }
-
-  Executor* getExecutor(const TaskID& taskId)
-  {
-    foreachpair (_, Executor* executor, executors) {
-      if (executor->tasks.count(taskId) > 0) {
-        return executor;
-      }
-    }
-
-    return NULL;
-  }
-
-  const FrameworkID frameworkId;
-  const FrameworkInfo info;
-
-  process::UPID pid;
-
-  boost::unordered_map<ExecutorID, Executor*> executors;
-  boost::unordered_map<double, boost::unordered_map<TaskID, TaskStatus> > statuses;
-};
-
-
-class Slave : public MesosProcess<Slave>
+// Slave process.
+class Slave : public ProtobufProcess<Slave>
 {
 public:
-  Slave(const Resources& resources, bool local,
-        IsolationModule* isolationModule);
-
-  Slave(const Configuration& conf, bool local,
+  Slave(const Configuration& conf,
+        bool local,
         IsolationModule *isolationModule);
 
-  virtual ~Slave();
-
-  static void registerOptions(Configurator* conf);
-
-  process::Promise<state::SlaveState*> getState();
-
-  // Callback used by isolation module to tell us when an executor
-  // exits.
-  void executorExited(const FrameworkID& frameworkId,
-                      const ExecutorID& executorId,
-                      int result);
+  Slave(const Resources& resources,
+        bool local,
+        IsolationModule* isolationModule);
 
-  // Kill a framework (possibly killing its executor).
-  void killFramework(Framework *framework, bool killExecutors = true);
+  virtual ~Slave();
 
-  // Helper function for generating a unique work directory for this
-  // framework/executor pair (non-trivial since a framework/executor
-  // pair may be launched more than once on the same slave).
-  std::string getUniqueWorkDirectory(
-    const FrameworkID& frameworkId,
-    const ExecutorID& executorId);
+  static void registerOptions(Configurator* configurator);
 
-  const Configuration& getConfiguration();
+  Promise<state::SlaveState*> getState();
 
-  void newMasterDetected(const std::string& pid);
+  void newMasterDetected(const UPID& pid);
   void noMasterDetected();
   void masterDetectionFailure();
-  void registerReply(const SlaveID& slaveId);
-  void reregisterReply(const SlaveID& slaveId);
+  void registered(const SlaveID& slaveId);
+  void reregistered(const SlaveID& slaveId);
   void runTask(const FrameworkInfo& frameworkInfo,
                const FrameworkID& frameworkId,
                const std::string& pid,
                const TaskDescription& task);
   void killTask(const FrameworkID& frameworkId,
                 const TaskID& taskId);
-  void killFramework(const FrameworkID& frameworkId);
+  void shutdownFramework(const FrameworkID& frameworkId);
   void schedulerMessage(const SlaveID& slaveId,
 			const FrameworkID& frameworkId,
 			const ExecutorID& executorId,
 			const std::string& data);
   void updateFramework(const FrameworkID& frameworkId,
                        const std::string& pid);
-  void statusUpdateAck(const FrameworkID& frameworkId,
-                       const SlaveID& slaveId,
-                       const TaskID& taskId);
+  void statusUpdateAcknowledgement(const SlaveID& slaveId,
+                                   const FrameworkID& frameworkId,
+                                   const TaskID& taskId,
+                                   const std::string& uuid);
   void registerExecutor(const FrameworkID& frameworkId,
                         const ExecutorID& executorId);
-  void statusUpdate(const FrameworkID& frameworkId,
-                    const TaskStatus& status);
+  void statusUpdate(const StatusUpdate& update);
   void executorMessage(const SlaveID& slaveId,
-		       const FrameworkID& frameworkId,
-		       const ExecutorID& executorId,
-		       const std::string& data);
+                       const FrameworkID& frameworkId,
+                       const ExecutorID& executorId,
+                       const std::string& data);
   void ping();
-  void timeout();
   void exited();
 
-  // TODO(...): Don't make these instance variables public! Hack for
-  // now because they are needed in the isolation modules.
-  bool local;
-  SlaveID slaveId;
+  void statusUpdateTimeout(const FrameworkID& frameworkId, const UUID& uuid);
+
+  void executorStarted(const FrameworkID& frameworkId,
+                       const ExecutorID& executorId,
+                       pid_t pid);
+
+  void executorExited(const FrameworkID& frameworkId,
+                      const ExecutorID& executorId,
+                      int status);
 
 protected:
   virtual void operator () ();
 
   void initialize();
 
+  // Helper routine to lookup a framework.
   Framework* getFramework(const FrameworkID& frameworkId);
 
-  // Send any tasks queued up for the given framework to its executor
-  // (needed if we received tasks while the executor was starting up).
-  void sendQueuedTasks(Framework* framework, Executor* executor);
+  // Shut down an executor. This is a two phase process. First, an
+  // executor receives a shut down message (shut down phase), then
+  // after a configurable timeout the slave actually forces a kill
+  // (kill phase, via the isolation module) if the executor has not
+  // exited.
+  void shutdownExecutor(Framework* framework, Executor* executor);
+
+  // Handle the second phase of shutting down an executor for those
+  // executors that have not properly shutdown within a timeout.
+  void shutdownExecutorTimeout(const FrameworkID& frameworkId,
+                               const ExecutorID& executorId,
+                               const UUID& uuid);
+
+//   // Create a new status update stream.
+//   StatusUpdates* createStatusUpdateStream(const StatusUpdateStreamID& streamId,
+//                                           const string& directory);
+
+//   StatusUpdates* getStatusUpdateStream(const StatusUpdateStreamID& streamId);
+
+  // Helper function for generating a unique work directory for this
+  // framework/executor pair (non-trivial since a framework/executor
+  // pair may be launched more than once on the same slave).
+  std::string getUniqueWorkDirectory(const FrameworkID& frameworkId,
+                                     const ExecutorID& executorId);
 
 private:
   // TODO(benh): Better naming and name scope for these http handlers.
-  process::Promise<process::HttpResponse> http_info_json(const process::HttpRequest& request);
-  process::Promise<process::HttpResponse> http_frameworks_json(const process::HttpRequest& request);
-  process::Promise<process::HttpResponse> http_tasks_json(const process::HttpRequest& request);
-  process::Promise<process::HttpResponse> http_stats_json(const process::HttpRequest& request);
-  process::Promise<process::HttpResponse> http_vars(const process::HttpRequest& request);
+  Promise<HttpResponse> http_info_json(const HttpRequest& request);
+  Promise<HttpResponse> http_frameworks_json(const HttpRequest& request);
+  Promise<HttpResponse> http_tasks_json(const HttpRequest& request);
+  Promise<HttpResponse> http_stats_json(const HttpRequest& request);
+  Promise<HttpResponse> http_vars(const HttpRequest& request);
 
   const Configuration conf;
 
-  SlaveInfo slave;
+  bool local;
+
+  SlaveID id;
+  SlaveInfo info;
+
+  UPID master;
 
-  process::UPID master;
   Resources resources;
 
-  // Invariant: framework will exist if executor exists.
-  boost::unordered_map<FrameworkID, Framework*> frameworks;
+  hashmap<FrameworkID, Framework*> frameworks;
 
-  IsolationModule *isolationModule;
+  IsolationModule* isolationModule;
 
   // Statistics (initialized in Slave::initialize).
   struct {
-    uint64_t launched_tasks;
-    uint64_t finished_tasks;
-    uint64_t killed_tasks;
-    uint64_t failed_tasks;
-    uint64_t lost_tasks;
-    uint64_t valid_status_updates;
-    uint64_t invalid_status_updates;
-    uint64_t valid_framework_messages;
-    uint64_t invalid_framework_messages;
-  } statistics;
+    uint64_t tasks[TaskState_ARRAYSIZE];
+    uint64_t validStatusUpdates;
+    uint64_t invalidStatusUpdates;
+    uint64_t validFrameworkMessages;
+    uint64_t invalidFrameworkMessages;
+  } stats;
 
   double startTime;
+
+//   typedef std::pair<FrameworkID, TaskID> StatusUpdateStreamID;
+//   hashmap<std::pair<FrameworkID, TaskID>, StatusUpdateStream*> statusUpdateStreams;
+
+//   hashmap<std::pair<FrameworkID, TaskID>, PendingStatusUpdate> pendingUpdates;
 };
 
 }}}
 
-#endif /* __SLAVE_HPP__ */
+#endif // __SLAVE_HPP__

Modified: incubator/mesos/trunk/src/slave/solaris_project_isolation_module.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/solaris_project_isolation_module.hpp?rev=1140024&r1=1140023&r2=1140024&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/solaris_project_isolation_module.hpp (original)
+++ incubator/mesos/trunk/src/slave/solaris_project_isolation_module.hpp Mon Jun 27 06:08:33 2011
@@ -28,7 +28,7 @@ protected:
 
   public:
     Communicator(SolarisProjectIsolationModule* module);
-    void operator() ();
+    virtual void operator() ();
     void stop();
 
   private:

Modified: incubator/mesos/trunk/src/slave/state.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/state.hpp?rev=1140024&r1=1140023&r2=1140024&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/state.hpp (original)
+++ incubator/mesos/trunk/src/slave/state.hpp Mon Jun 27 06:08:33 2011
@@ -15,7 +15,7 @@ namespace mesos { namespace internal { n
 struct Task
 {
   Task(std::string id_, const std::string& name_, std::string state_,
-      int32_t cpus_, int64_t mem_)
+      double cpus_, double mem_)
     : id(id_), name(name_), state(state_), cpus(cpus_), mem(mem_) {}
 
   Task() {}
@@ -23,15 +23,15 @@ struct Task
   std::string id;
   std::string name;
   std::string state;
-  int32_t cpus;
-  int64_t mem;
+  double cpus;
+  double mem;
 };
 
 struct Framework
 {
   Framework(std::string id_, const std::string& name_,
       const std::string& executor_uri_, const std::string& executor_status_,
-      int32_t cpus_, int64_t mem_)
+      double cpus_, double mem_)
     : id(id_), name(name_), executor_uri(executor_uri_),
       executor_status(executor_status_), cpus(cpus_), mem(mem_) {}
 
@@ -47,8 +47,8 @@ struct Framework
   std::string name;
   std::string executor_uri;
   std::string executor_status;
-  int32_t cpus;
-  int64_t mem;
+  double cpus;
+  double mem;
 
   std::vector<Task *> tasks;
 };
@@ -56,7 +56,7 @@ struct Framework
 struct SlaveState
 {
   SlaveState(const std::string& build_date_, const std::string& build_user_,
-	     std::string id_, int32_t cpus_, int64_t mem_, const std::string& pid_,
+	     std::string id_, double cpus_, double mem_, const std::string& pid_,
 	     const std::string& master_pid_)
     : build_date(build_date_), build_user(build_user_), id(id_),
       cpus(cpus_), mem(mem_), pid(pid_), master_pid(master_pid_) {}
@@ -72,8 +72,8 @@ struct SlaveState
   std::string build_date;
   std::string build_user;
   std::string id;
-  int32_t cpus;
-  int64_t mem;
+  double cpus;
+  double mem;
   std::string pid;
   std::string master_pid;
 

Modified: incubator/mesos/trunk/src/slave/webui.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/webui.cpp?rev=1140024&r1=1140023&r2=1140024&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/webui.cpp (original)
+++ incubator/mesos/trunk/src/slave/webui.cpp Mon Jun 27 06:08:33 2011
@@ -3,31 +3,34 @@
 #include <sstream>
 #include <string>
 
+#include <process/dispatch.hpp>
+
 #include "state.hpp"
 #include "webui.hpp"
 
+#include "configurator/configuration.hpp"
+
 #ifdef MESOS_WEBUI
 
 #include <Python.h>
 
+using process::PID;
+
 using std::string;
 
 
 extern "C" void init_slave();  // Initializer for the Python slave module
 
-namespace {
-
-PID slave;
-string webuiPort;
-string logDir;
-string workDir;
-
-}
 
 namespace mesos { namespace internal { namespace slave {
 
+static PID<Slave> slave;
+static string webuiPort;
+static string logDir;
+static string workDir;
+
 
-void *runSlaveWebUI(void *)
+void* runSlaveWebUI(void*)
 {
   LOG(INFO) << "Web UI thread started";
   Py_Initialize();
@@ -50,19 +53,19 @@ void *runSlaveWebUI(void *)
 }
 
 
-void startSlaveWebUI(const PID &slave, const Params &params)
+void startSlaveWebUI(const PID<Slave>& _slave, const Configuration& conf)
 {
   // TODO(*): See the note in master/webui.cpp about having to
   // determine default values. These should be set by now and can just
   // be used! For example, what happens when the slave code changes
   // their default location for the work directory, it might not get
   // changed here!
-  webuiPort = params.get("webui_port", "8081");
-  logDir = params.get("log_dir", FLAGS_log_dir);
-  if (params.contains("work_dir")) {
-    workDir = params.get("work_dir", "");
-  } else if (params.contains("home")) {
-    workDir = params.get("home", "") + "/work";
+  webuiPort = conf.get("webui_port", "8081");
+  logDir = conf.get("log_dir", FLAGS_log_dir);
+  if (conf.contains("work_dir")) {
+    workDir = conf.get("work_dir", "");
+  } else if (conf.contains("home")) {
+    workDir = conf.get("home", "") + "/work";
   } else {
     workDir = "work";
   }
@@ -71,7 +74,7 @@ void startSlaveWebUI(const PID &slave, c
 
   LOG(INFO) << "Starting slave web UI on port " << webuiPort;
 
-  ::slave = slave;
+  slave = _slave;
   pthread_t thread;
   pthread_create(&thread, 0, runSlaveWebUI, NULL);
 }
@@ -79,36 +82,15 @@ void startSlaveWebUI(const PID &slave, c
 
 namespace state {
 
-class StateGetter : public MesosProcess
-{
-public:
-  SlaveState *slaveState;
-
-  StateGetter() {}
-  ~StateGetter() {}
-
-  void operator () ()
-  {
-    send(::slave, pack<S2S_GET_STATE>());
-    receive();
-    CHECK(msgid() == S2S_GET_STATE_REPLY);
-    slaveState = unpack<S2S_GET_STATE_REPLY, 0>(body());
-  }
-};
-
-
-// From slave_state.hpp
-SlaveState *get_slave()
+// From slave_state.hpp.
+SlaveState* get_slave()
 {
-  StateGetter getter;
-  PID pid = Process::spawn(&getter);
-  Process::wait(pid);
-  return getter.slaveState;
+  return process::call(slave, &Slave::getState);
 }
 
-} /* namespace state { */
+} // namespace state {
 
-}}} /* namespace mesos { namespace internal { namespace slave { */
+}}} // namespace mesos { namespace internal { namespace slave {
 
 
-#endif /* MESOS_WEBUI */
+#endif // MESOS_WEBUI

Modified: incubator/mesos/trunk/src/tests/Makefile.in
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/Makefile.in?rev=1140024&r1=1140023&r2=1140024&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/Makefile.in (original)
+++ incubator/mesos/trunk/src/tests/Makefile.in Mon Jun 27 06:08:33 2011
@@ -2,6 +2,8 @@
 
 SHELL = '/bin/sh'
 
+SRCDIR = @srcdir@
+INCLUDEDIR = @top_builddir@/include
 BINDIR = @top_builddir@/bin
 LIBDIR = @top_builddir@/lib
 
@@ -18,12 +20,11 @@ WITH_ZOOKEEPER = @WITH_ZOOKEEPER@
 WITH_INCLUDED_ZOOKEEPER = @WITH_INCLUDED_ZOOKEEPER@
 
 LIBPROCESS = third_party/libprocess
-
 LIBEV = $(LIBPROCESS)/third_party/libev-3.8
-
+BOOST = third_party/boost-1.37.0
+PROTOBUF = third_party/protobuf-2.3.0
 GLOG = third_party/glog-0.3.1
 GMOCK = third_party/gmock-1.5.0
-
 ZOOKEEPER = third_party/zookeeper-3.3.1/src/c
 
 # Ensure that we get better debugging info.
@@ -34,13 +35,17 @@ CXXFLAGS += -g
 CFLAGS += -I@srcdir@/.. -I..
 CXXFLAGS += -I@srcdir@/.. -I..
 
-# Add include to CFLAGS and CXXFLAGS.
-CFLAGS += -I@top_srcdir@/include
-CXXFLAGS += -I@top_srcdir@/include
+# Add include and build include to CFLAGS and CXXFLAGS.
+CFLAGS += -I@top_srcdir@/include -I$(INCLUDEDIR)
+CXXFLAGS += -I@top_srcdir@/include -I$(INCLUDEDIR)
 
 # Add boost to CFLAGS and CXXFLAGS.
-CFLAGS += -I@top_srcdir@/third_party/boost-1.37.0
-CXXFLAGS += -I@top_srcdir@/third_party/boost-1.37.0
+CFLAGS += -I@top_srcdir@/$(BOOST)
+CXXFLAGS += -I@top_srcdir@/$(BOOST)
+
+# Add protobuf to include and lib paths.
+CXXFLAGS += -I@top_srcdir@/$(PROTOBUF)/src
+LDFLAGS += -L@top_builddir@/$(PROTOBUF)/src/.libs
 
 # Add libprocess to CFLAGS, CXXFLAGS, and LDFLAGS.
 CFLAGS += -I@top_srcdir@/$(LIBPROCESS)/include
@@ -64,29 +69,23 @@ endif
 CFLAGS += -MMD -MP
 CXXFLAGS += -MMD -MP
 
-# Add build date to CFLAGS, CXXFLAGS
-CFLAGS += -DBUILD_DATE="\"$$(date '+%Y-%m-%d %H:%M:%S')\""
-CXXFLAGS += -DBUILD_DATE="\"$$(date '+%Y-%m-%d %H:%M:%S')\""
-
-# Add build user to CFLAGS, CXXFLAGS
-CFLAGS += -DBUILD_USER="\"$$USER\""
-CXXFLAGS += -DBUILD_USER="\"$$USER\""
-
-# Add glog, gmock, gtest, libev, libprocess, pthread, and dl to LIBS.
-LIBS += -lglog -lgmock -lgtest -lprocess -lev -lpthread -ldl
+# Add protobuf, glog, gmock, gtest, libev, libprocess, pthread, and dl to LIBS.
+LIBS += -lprotobuf -lglog -lgmock -lgtest -lprocess -lev -lpthread -ldl
 
 # Add ZooKeeper if necessary.
 ifeq ($(WITH_ZOOKEEPER),1)
-  LIBS += -lzookeeper_st
+  LIBS += -lzookeeper_mt
 endif
 
 SCHED_LIB = $(LIBDIR)/libmesos_sched.a
 EXEC_LIB = $(LIBDIR)/libmesos_exec.a
 
-TESTS_OBJ = main.o utils.o master_test.o offer_reply_errors_test.o	\
-	    resources_test.o external_test.o sample_frameworks_test.o	\
-	    configurator_test.o string_utils_test.o			\
-	    lxc_isolation_test.o
+TESTS_OBJ = main.o utils.o master_tests.o resource_offer_tests.o	\
+	    resource_offer_reply_tests.o fault_tolerant_tests.o		\
+	    resources_tests.o uuid_tests.o external_tests.o		\
+	    sample_frameworks_tests.o configurator_tests.o		\
+	    string_utils_tests.o multimap_tests.o			\
+	    lxc_isolation_tests.o
 
 ALLTESTS_EXE = $(BINDIR)/tests/all-tests
 
@@ -95,7 +94,6 @@ EXTERNAL_SCRIPTS =							\
   $(BINDIR)/tests/external/LxcIsolation/ScaleUpAndDown.sh		\
   $(BINDIR)/tests/external/LxcIsolation/TwoSeparateTasks.sh		\
   $(BINDIR)/tests/external/LxcIsolation/run_scheduled_memhog_test.sh	\
-  $(BINDIR)/tests/external/SampleFrameworks/CFramework.sh		\
   $(BINDIR)/tests/external/SampleFrameworks/CFrameworkCmdlineParsing.sh	\
   $(BINDIR)/tests/external/SampleFrameworks/CFrameworkInvalidCmdline.sh	\
   $(BINDIR)/tests/external/SampleFrameworks/CFrameworkInvalidEnv.sh	\

Added: incubator/mesos/trunk/src/tests/configurator_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/configurator_tests.cpp?rev=1140024&view=auto
==============================================================================
--- incubator/mesos/trunk/src/tests/configurator_tests.cpp (added)
+++ incubator/mesos/trunk/src/tests/configurator_tests.cpp Mon Jun 27 06:08:33 2011
@@ -0,0 +1,304 @@
+#include <gtest/gtest.h>
+
+#include <fstream>
+
+#include <boost/lexical_cast.hpp>
+
+#include "configurator/configurator.hpp"
+
+#include "tests/utils.hpp"
+
+using boost::lexical_cast;
+
+using std::ofstream;
+using std::string;
+
+using namespace mesos;
+using namespace mesos::internal;
+using namespace mesos::internal::test;
+
+
+TEST(ConfiguratorTest, Environment)
+{
+  setenv("MESOS_TEST", "working", true);
+  Configurator conf;
+  conf.load();
+  unsetenv("MESOS_TEST");
+
+  EXPECT_EQ("working", conf.getConfiguration()["test"]);
+}
+
+
+TEST(ConfiguratorTest, DefaultOptions)
+{
+  const int ARGC = 5;
+  char* argv[ARGC];
+  argv[0] = (char*) "bin/filename";
+  argv[1] = (char*) "--test1=501";
+  argv[2] = (char*) "--test2";
+  argv[3] = (char*) "--excp=txt";
+  argv[4] = (char*) "--test8=foo";
+
+  Configurator conf;
+
+  EXPECT_NO_THROW(conf.addOption<int>("test1", "Testing option", 500));
+  EXPECT_NO_THROW(conf.addOption<bool>("test2", "Another tester", 0));
+  EXPECT_NO_THROW(conf.addOption<long>("test3", "Tests the default", 2010));
+  EXPECT_NO_THROW(conf.addOption<string>("test4", "Option without default"));
+  EXPECT_NO_THROW(conf.addOption<string>("test5", "Option with a default", 
+                                         "default"));
+  EXPECT_NO_THROW(conf.addOption<bool>("test6", "Toggler...", false));
+  EXPECT_NO_THROW(conf.addOption<bool>("test7", "Toggler...", true));
+  EXPECT_NO_THROW(conf.addOption<string>("test8", "Overridden default", 
+                                         "default"));
+  EXPECT_NO_THROW(conf.load(ARGC, argv, false));
+
+  EXPECT_NO_THROW(conf.addOption<int>("excp", "Exception tester.", 50));
+  EXPECT_THROW(conf.validate(), ConfigurationException);
+  conf.getConfiguration()["excp"] = "27";
+  EXPECT_NO_THROW(conf.validate());
+
+  EXPECT_EQ("501",     conf.getConfiguration()["test1"]);
+  EXPECT_EQ("1",       conf.getConfiguration()["test2"]);
+  EXPECT_EQ("2010",    conf.getConfiguration()["test3"]);
+  EXPECT_EQ("",        conf.getConfiguration()["test4"]);
+  EXPECT_EQ("default", conf.getConfiguration()["test5"]);
+  EXPECT_EQ("27",      conf.getConfiguration()["excp"]);
+  EXPECT_EQ("0",       conf.getConfiguration()["test6"]);
+  EXPECT_EQ("1",       conf.getConfiguration()["test7"]);
+  EXPECT_EQ("foo",     conf.getConfiguration()["test8"]);
+}
+
+
+TEST(ConfiguratorTest, CommandLine)
+{
+  const int ARGC = 12;
+  char* argv[ARGC];
+  argv[0] =  (char*) "bin/filename";
+  argv[1] =  (char*) "--test1=text1";
+  argv[2] =  (char*) "--test2";
+  argv[3] =  (char*) "text2";
+  argv[4] =  (char*) "-N";
+  argv[5] =  (char*) "-25";
+  argv[6] =  (char*) "--cAsE=4";
+  argv[7] =  (char*) "--space=Long String";
+  argv[8] =  (char*) "--bool1";
+  argv[9] =  (char*) "--no-bool2";
+  argv[10] = (char*) "-a";
+  argv[11] = (char*) "-no-b";
+
+  Configurator conf;
+  EXPECT_NO_THROW(conf.addOption<int>("negative", 'N', "some val", -30));
+  EXPECT_NO_THROW(conf.addOption<string>("test1", "textual value", "text2"));
+  EXPECT_NO_THROW(conf.addOption<bool>("bool1", "toggler", false));
+  EXPECT_NO_THROW(conf.addOption<bool>("bool2", 'z', "toggler", true));
+  EXPECT_NO_THROW(conf.addOption<bool>("bool3", 'a', "toggler", false));
+  EXPECT_NO_THROW(conf.addOption<bool>("bool4", 'b', "toggler", true));
+
+  EXPECT_NO_THROW( conf.load(ARGC, argv, false) );
+
+  EXPECT_EQ("text1",       conf.getConfiguration()["test1"]);
+  EXPECT_EQ("1",           conf.getConfiguration()["test2"]);
+  EXPECT_EQ("-25",         conf.getConfiguration()["negative"]);
+  EXPECT_EQ("4",           conf.getConfiguration()["case"]);
+  EXPECT_EQ("Long String", conf.getConfiguration()["space"]);
+  EXPECT_EQ("1",           conf.getConfiguration()["bool1"]);
+  EXPECT_EQ("0",           conf.getConfiguration()["bool2"]);
+  EXPECT_EQ("1",           conf.getConfiguration()["bool3"]);
+  EXPECT_EQ("0",           conf.getConfiguration()["bool4"]);
+}
+
+
+// Check whether specifying MESOS_HOME allows a config file to be loaded
+// from the default config directory (MESOS_HOME/conf)
+TEST_WITH_WORKDIR(ConfiguratorTest, ConfigFileWithMesosHome)
+{
+  if (mkdir("bin", 0755) != 0)
+    FAIL() << "Failed to create directory bin";
+  if (mkdir("conf", 0755) != 0)
+    FAIL() << "Failed to create directory conf";
+  ofstream file("conf/mesos.conf");
+  file << "test1=coffee # beans are tasty\n";
+  file << "# just a comment\n";
+  file << "test2=tea\n";
+  file.close();
+
+  setenv("MESOS_HOME", ".", 1);
+  Configurator conf;
+  char* argv[1] = { (char*) "bin/foo" };
+  EXPECT_NO_THROW( conf.load(1, argv, true) );
+  unsetenv("MESOS_HOME");
+
+  EXPECT_EQ("coffee", conf.getConfiguration()["test1"]);
+  EXPECT_EQ("tea",    conf.getConfiguration()["test2"]);
+}
+  
+
+// Check whether specifying just MESOS_CONF allows a config file to be loaded
+TEST_WITH_WORKDIR(ConfiguratorTest, ConfigFileWithConfDir)
+{
+  if (mkdir("conf2", 0755) != 0)
+    FAIL() << "Failed to create directory conf2";
+  ofstream file("conf2/mesos.conf");
+  file << "test3=shake # sugar bomb\n";
+  file << "# just a comment\n";
+  file << "test4=milk\n";
+  file.close();
+  setenv("MESOS_CONF", "conf2", 1);
+  Configurator conf;
+  EXPECT_NO_THROW( conf.load() );
+  unsetenv("MESOS_CONF");
+
+  EXPECT_EQ("shake", conf.getConfiguration()["test3"]);
+  EXPECT_EQ("milk",  conf.getConfiguration()["test4"]);
+}
+
+
+// Check that setting MESOS_CONF variable overrides the default location
+// of conf directory relative in MESOS_HOME/conf.
+TEST_WITH_WORKDIR(ConfiguratorTest, ConfigFileWithHomeAndDir)
+{
+  if (mkdir("bin", 0755) != 0)
+    FAIL() << "Failed to create directory bin";
+  if (mkdir("conf2", 0755) != 0)
+    FAIL() << "Failed to create directory conf2";
+  ofstream file("conf2/mesos.conf");
+  file << "test3=shake # sugar bomb\n";
+  file << "# just a comment\n";
+  file << "test4=milk\n";
+  file.close();
+
+  setenv("MESOS_CONF", "conf2", 1);
+  Configurator conf;
+  char* argv[1] = { (char*) "bin/foo" };
+  EXPECT_NO_THROW( conf.load(1, argv, true) );
+  unsetenv("MESOS_CONF");
+
+  EXPECT_EQ("shake", conf.getConfiguration()["test3"]);
+  EXPECT_EQ("milk",  conf.getConfiguration()["test4"]);
+}
+
+
+// Check that when we specify a conf directory on the command line,
+// we load values from the config file first and then the command line
+TEST_WITH_WORKDIR(ConfiguratorTest, CommandLineConfFlag)
+{
+  if (mkdir("bin", 0755) != 0)
+    FAIL() << "Failed to create directory bin";
+  if (mkdir("conf2", 0755) != 0)
+    FAIL() << "Failed to create directory conf2";
+  ofstream file("conf2/mesos.conf");
+  file << "a=1\n";
+  file << "b=2\n";
+  file << "c=3";
+  file.close();
+
+  const int ARGC = 4;
+  char* argv[ARGC];
+  argv[0] = (char*) "bin/filename";
+  argv[1] = (char*) "--conf=conf2";
+  argv[2] = (char*) "--b=overridden";
+  argv[3] = (char*) "--d=fromCmdLine";
+
+  Configurator conf;
+  EXPECT_NO_THROW( conf.load(ARGC, argv, false) );
+
+  EXPECT_EQ("1",           conf.getConfiguration()["a"]);
+  EXPECT_EQ("overridden",  conf.getConfiguration()["b"]);
+  EXPECT_EQ("3",           conf.getConfiguration()["c"]);
+  EXPECT_EQ("fromCmdLine", conf.getConfiguration()["d"]);
+}
+
+
+// Check that variables are loaded with the correct priority when an
+// environment variable, a config file element , and a config flag
+// are all present. Command line flags should have the highest priority,
+// second should be environment variables, and last should be the file.
+TEST_WITH_WORKDIR(ConfiguratorTest, LoadingPriorities)
+{
+  // Create a file which contains parameters a, b, c and d
+  if (mkdir("bin", 0755) != 0)
+    FAIL() << "Failed to create directory bin";
+  if (mkdir("conf", 0755) != 0)
+    FAIL() << "Failed to create directory conf";
+  ofstream file("conf/mesos.conf");
+  file << "a=fromFile\n";
+  file << "b=fromFile\n";
+  file << "c=fromFile\n";
+  file << "d=fromFile\n";
+  file.close();
+
+  // Set environment to contain parameters a and b
+  setenv("MESOS_A", "fromEnv", 1);
+  setenv("MESOS_B", "fromEnv", 1);
+
+  // Pass parameters a and c from the command line
+  const int ARGC = 3;
+  char* argv[ARGC];
+  argv[0] = (char*) "bin/filename";
+  argv[1] = (char*) "--a=fromCmdLine";
+  argv[2] = (char*) "--c=fromCmdLine";
+
+  Configurator conf;
+  EXPECT_NO_THROW( conf.load(ARGC, argv, true) );
+
+  // Clear the environment vars set above
+  unsetenv("MESOS_A");
+  unsetenv("MESOS_B");
+
+  // Check that every variable is obtained from the highest-priority location
+  // (command line > env > file)
+  EXPECT_EQ("fromCmdLine", conf.getConfiguration()["a"]);
+  EXPECT_EQ("fromEnv",     conf.getConfiguration()["b"]);
+  EXPECT_EQ("fromCmdLine", conf.getConfiguration()["c"]);
+  EXPECT_EQ("fromFile",    conf.getConfiguration()["d"]);
+}
+
+
+// Check that spaces before and after the = signs in config files are ignored
+TEST_WITH_WORKDIR(ConfiguratorTest, ConfigFileSpacesIgnored)
+{
+  if (mkdir("conf", 0755) != 0)
+    FAIL() << "Failed to create directory conf";
+  ofstream file("conf/mesos.conf");
+  file << "test1=coffee # beans are tasty\n";
+  file << "# just a comment\n";
+  file << "  \t # comment with spaces in front\n";
+  file << "\n";
+  file << "test2 =tea\n";
+  file << "test3=  water\n";
+  file << "   test4 =  milk\n";
+  file << "  test5 =  hot  chocolate\t\n";
+  file << "\ttest6 =  juice# #\n";
+  file.close();
+
+  Configurator conf;
+  setenv("MESOS_CONF", "conf", 1);
+  EXPECT_NO_THROW(conf.load());
+  unsetenv("MESOS_CONF");
+
+  EXPECT_EQ("coffee",         conf.getConfiguration()["test1"]);
+  EXPECT_EQ("tea",            conf.getConfiguration()["test2"]);
+  EXPECT_EQ("water",          conf.getConfiguration()["test3"]);
+  EXPECT_EQ("milk",           conf.getConfiguration()["test4"]);
+  EXPECT_EQ("hot  chocolate", conf.getConfiguration()["test5"]);
+  EXPECT_EQ("juice",          conf.getConfiguration()["test6"]);
+}
+
+
+// Check that exceptions are thrown on invalid config file
+TEST_WITH_WORKDIR(ConfiguratorTest, MalformedConfigFile)
+{
+  if (mkdir("conf", 0755) != 0)
+    FAIL() << "Failed to create directory conf";
+  ofstream file("conf/mesos.conf");
+  file << "test1=coffee\n";
+  file << "JUNK\n";
+  file << "test2=tea\n";
+  file.close();
+
+  setenv("MESOS_CONF", "conf", 1);
+  Configurator conf;
+  EXPECT_THROW(conf.load(), ConfigurationException);
+  unsetenv("MESOS_CONF");
+}

Modified: incubator/mesos/trunk/src/tests/external/LxcIsolation/run_scheduled_memhog_test.sh
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/external/LxcIsolation/run_scheduled_memhog_test.sh?rev=1140024&r1=1140023&r2=1140024&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/external/LxcIsolation/run_scheduled_memhog_test.sh (original)
+++ incubator/mesos/trunk/src/tests/external/LxcIsolation/run_scheduled_memhog_test.sh Mon Jun 27 06:08:33 2011
@@ -13,7 +13,7 @@ if [[ $EUID -ne 0 ]]; then
 fi
 
 # Launch master
-$MESOS_HOME/mesos-master -p 5432 > master.log 2>&1 &
+$MESOS_HOME/mesos-master --port=5432 > master.log 2>&1 &
 MASTER_PID=$!
 echo "Launched master, PID = $MASTER_PID"
 sleep 2
@@ -27,8 +27,11 @@ if [[ $KILL_EXIT_CODE -ne 0 ]]; then
 fi
 
 # Launch slave
-$MESOS_HOME/mesos-slave -u 1@$HOSTNAME:5432 -i lxc \
-                        -c 2 -m $[512*1024*1024] > slave.log 2>&1 &
+$MESOS_HOME/mesos-slave \
+    --url=master@localhost:5432 \
+    --isolation=lxc \
+    --resources="cpus:2;mem:$[512*1024*1024]" \
+    > slave.log 2>&1 &
 SLAVE_PID=$!
 echo "Launched slave, PID = $SLAVE_PID"
 sleep 2
@@ -44,7 +47,7 @@ fi
 
 # Launch memhog
 echo "Running scheduled-memhog"
-$MESOS_HOME/scheduled-memhog 1@$HOSTNAME:5432 schedule > memhog.log 2>&1
+$MESOS_HOME/examples/scheduled-memhog master@localhost:5432 schedule > memhog.log 2>&1
 EXIT_CODE=$?
 echo "Memhog exit code: $?"
 sleep 2

Added: incubator/mesos/trunk/src/tests/external_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/external_tests.cpp?rev=1140024&view=auto
==============================================================================
--- incubator/mesos/trunk/src/tests/external_tests.cpp (added)
+++ incubator/mesos/trunk/src/tests/external_tests.cpp Mon Jun 27 06:08:33 2011
@@ -0,0 +1,59 @@
+#include <stdlib.h>
+
+#include <gtest/gtest.h>
+
+#include <string>
+
+#include <boost/lexical_cast.hpp>
+
+#include "common/fatal.hpp"
+
+#include "tests/external_test.hpp"
+#include "tests/utils.hpp"
+
+using namespace mesos::internal::test;
+
+using std::string;
+
+
+/**
+ * Run an external test with the given name. The test is expected to be
+ * located in src/tests/external/<testCase>/<testName>.sh.
+ * We execute this script in directory test_output/<testCase>/<testName>,
+ * piping its output to files called stdout and stderr, and the test
+ * passes if the script returns 0.
+ */
+void mesos::internal::test::runExternalTest(const char* testCase,
+                                            const char* testName)
+{
+  // Remove DISABLED_ prefix from test name if this is a disabled test
+  if (strncmp(testName, "DISABLED_", strlen("DISABLED_")) == 0)
+    testName += strlen("DISABLED_");
+  // Create and go into the test's work directory
+  enterTestDirectory(testCase, testName);
+  // Figure out the absolute path to the test script
+  string script = mesosHome + "/bin/tests/external/" + testCase
+                             + "/" + testName + ".sh";
+  // Fork a process to change directory and run the test
+  pid_t pid;
+  if ((pid = fork()) == -1) {
+    FAIL() << "Failed to fork to launch external test";
+  }
+  if (pid) {
+    // In parent process
+    int exitCode;
+    wait(&exitCode);
+    ASSERT_EQ(0, exitCode) << "External test " << testName << " failed";
+  } else {
+    // In child process. Redirect standard output and error to files,
+    // set MESOS_HOME environment variable, and exec the test script.
+    if (freopen("stdout", "w", stdout) == NULL)
+      fatalerror("freopen failed");
+    if (freopen("stderr", "w", stderr) == NULL)
+      fatalerror("freopen failed");
+    setenv("MESOS_HOME", mesosHome.c_str(), 1);
+    execl(script.c_str(), script.c_str(), (char*) NULL);
+    // If we get here, execl failed; report the error
+    fatalerror("Could not execute %s", script.c_str());
+  }
+}

Added: incubator/mesos/trunk/src/tests/fault_tolerant_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/fault_tolerant_tests.cpp?rev=1140024&view=auto
==============================================================================
--- incubator/mesos/trunk/src/tests/fault_tolerant_tests.cpp (added)
+++ incubator/mesos/trunk/src/tests/fault_tolerant_tests.cpp Mon Jun 27 06:08:33 2011
@@ -0,0 +1,508 @@
+#include <gmock/gmock.h>
+
+#include <mesos/executor.hpp>
+#include <mesos/scheduler.hpp>
+
+#include "detector/detector.hpp"
+
+#include "local/local.hpp"
+
+#include "master/master.hpp"
+
+#include "slave/process_based_isolation_module.hpp"
+#include "slave/slave.hpp"
+
+#include "tests/utils.hpp"
+
+using namespace mesos;
+using namespace mesos::internal;
+using namespace mesos::internal::test;
+
+using mesos::internal::master::Master;
+
+using mesos::internal::slave::ProcessBasedIsolationModule;
+using mesos::internal::slave::Slave;
+using mesos::internal::slave::STATUS_UPDATE_RETRY_INTERVAL_SECONDS;
+
+using process::PID;
+
+using std::string;
+using std::map;
+using std::vector;
+
+using testing::_;
+using testing::AnyOf;
+using testing::AtMost;
+using testing::DoAll;
+using testing::ElementsAre;
+using testing::Eq;
+using testing::Not;
+using testing::Return;
+using testing::SaveArg;
+
+
+TEST(MasterTest, SlaveLost)
+{
+  ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+  Master m;
+  PID<Master> master = process::spawn(&m);
+
+  ProcessBasedIsolationModule isolationModule;
+
+  Resources resources = Resources::parse("cpus:2;mem:1024");
+  
+  Slave s(resources, true, &isolationModule);
+  PID<Slave> slave = process::spawn(&s);
+
+  BasicMasterDetector detector(master, slave, true);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(&sched, master);
+
+  OfferID offerId;
+  vector<SlaveOffer> offers;
+
+  trigger resourceOfferCall;
+
+  EXPECT_CALL(sched, getFrameworkName(&driver))
+    .WillOnce(Return(""));
+
+  EXPECT_CALL(sched, getExecutorInfo(&driver))
+    .WillOnce(Return(DEFAULT_EXECUTOR_INFO));
+
+  EXPECT_CALL(sched, registered(&driver, _))
+    .Times(1);
+
+  EXPECT_CALL(sched, resourceOffer(&driver, _, _))
+    .WillOnce(DoAll(SaveArg<1>(&offerId), SaveArg<2>(&offers),
+                    Trigger(&resourceOfferCall)))
+    .WillRepeatedly(Return());
+
+  driver.start();
+
+  WAIT_UNTIL(resourceOfferCall);
+
+  EXPECT_NE(0, offers.size());
+
+  trigger offerRescindedCall, slaveLostCall;
+
+  EXPECT_CALL(sched, offerRescinded(&driver, offerId))
+    .WillOnce(Trigger(&offerRescindedCall));
+
+  EXPECT_CALL(sched, slaveLost(&driver, offers[0].slave_id()))
+    .WillOnce(Trigger(&slaveLostCall));
+
+  process::post(slave, process::TERMINATE);
+
+  WAIT_UNTIL(offerRescindedCall);
+  WAIT_UNTIL(slaveLostCall);
+
+  driver.stop();
+  driver.join();
+
+  process::wait(slave);
+
+  process::post(master, process::TERMINATE);
+  process::wait(master);
+}
+
+
+TEST(MasterTest, SlavePartitioned)
+{
+  ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+  process::Clock::pause();
+
+  MockFilter filter;
+  process::filter(&filter);
+
+  EXPECT_MSG(filter, _, _, _)
+    .WillRepeatedly(Return(false));
+
+  PID<Master> master = local::launch(1, 2, 1 * Gigabyte, false, false);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(&sched, master);
+
+  trigger slaveLostCall;
+
+  EXPECT_CALL(sched, getFrameworkName(&driver))
+    .WillOnce(Return(""));
+
+  EXPECT_CALL(sched, getExecutorInfo(&driver))
+    .WillOnce(Return(DEFAULT_EXECUTOR_INFO));
+
+  EXPECT_CALL(sched, registered(&driver, _))
+    .Times(1);
+
+  EXPECT_CALL(sched, resourceOffer(&driver, _, _))
+    .WillRepeatedly(Return());
+
+  EXPECT_CALL(sched, offerRescinded(&driver, _))
+    .Times(AtMost(1));
+
+  EXPECT_CALL(sched, slaveLost(&driver, _))
+    .WillOnce(Trigger(&slaveLostCall));
+
+  EXPECT_MSG(filter, Eq("PONG"), _, _)
+    .WillRepeatedly(Return(true));
+
+  driver.start();
+
+  double secs = master::SLAVE_PONG_TIMEOUT * master::MAX_SLAVE_TIMEOUTS;
+
+  process::Clock::advance(secs);
+
+  WAIT_UNTIL(slaveLostCall);
+
+  driver.stop();
+  driver.join();
+
+  local::shutdown();
+
+  process::filter(NULL);
+
+  process::Clock::resume();
+}
+
+
+TEST(MasterTest, SchedulerFailover)
+{
+  ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+  PID<Master> master = local::launch(1, 2, 1 * Gigabyte, false, false);
+
+  // Launch the first (i.e., failing) scheduler and wait until
+  // registered gets called to launch the second (i.e., failover)
+  // scheduler.
+
+  MockScheduler sched1;
+  MesosSchedulerDriver driver1(&sched1, master);
+
+  FrameworkID frameworkId;
+
+  trigger sched1RegisteredCall;
+
+  EXPECT_CALL(sched1, getFrameworkName(&driver1))
+    .WillOnce(Return(""));
+
+  EXPECT_CALL(sched1, getExecutorInfo(&driver1))
+    .WillOnce(Return(DEFAULT_EXECUTOR_INFO));
+
+  EXPECT_CALL(sched1, registered(&driver1, _))
+    .WillOnce(DoAll(SaveArg<1>(&frameworkId), Trigger(&sched1RegisteredCall)));
+
+  EXPECT_CALL(sched1, resourceOffer(&driver1, _, _))
+    .WillRepeatedly(Return());
+
+  EXPECT_CALL(sched1, offerRescinded(&driver1, _))
+    .Times(AtMost(1));
+
+  EXPECT_CALL(sched1, error(&driver1, _, "Framework failover"))
+    .Times(1);
+
+  driver1.start();
+
+  WAIT_UNTIL(sched1RegisteredCall);
+
+  // Now launch the second (i.e., failover) scheduler using the
+  // framework id recorded from the first scheduler and wait until it
+  // gets a registered callback..
+
+  MockScheduler sched2;
+  MesosSchedulerDriver driver2(&sched2, master, frameworkId);
+
+  trigger sched2RegisteredCall;
+
+  EXPECT_CALL(sched2, getFrameworkName(&driver2))
+    .WillOnce(Return(""));
+
+  EXPECT_CALL(sched2, getExecutorInfo(&driver2))
+    .WillOnce(Return(DEFAULT_EXECUTOR_INFO));
+
+  EXPECT_CALL(sched2, registered(&driver2, frameworkId))
+    .WillOnce(Trigger(&sched2RegisteredCall));
+
+  EXPECT_CALL(sched2, resourceOffer(&driver2, _, _))
+    .WillRepeatedly(Return());
+
+  EXPECT_CALL(sched2, offerRescinded(&driver2, _))
+    .Times(AtMost(1));
+
+  driver2.start();
+
+  WAIT_UNTIL(sched2RegisteredCall);
+
+  driver1.stop();
+  driver2.stop();
+
+  driver1.join();
+  driver2.join();
+
+  local::shutdown();
+}
+
+
+TEST(MasterTest, SchedulerFailoverStatusUpdate)
+{
+  ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+  process::Clock::pause();
+
+  MockFilter filter;
+  process::filter(&filter);
+
+  EXPECT_MSG(filter, _, _, _)
+    .WillRepeatedly(Return(false));
+
+  Master m;
+  PID<Master> master = process::spawn(&m);
+
+  MockExecutor exec;
+
+  EXPECT_CALL(exec, init(_, _))
+    .Times(1);
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdate(TASK_RUNNING));
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  map<ExecutorID, Executor*> execs;
+  execs[DEFAULT_EXECUTOR_ID] = &exec;
+
+  TestingIsolationModule isolationModule(execs);
+
+  Resources resources = Resources::parse("cpus:2;mem:1024");
+
+  Slave s(resources, true, &isolationModule);
+  PID<Slave> slave = process::spawn(&s);
+
+  BasicMasterDetector detector(master, slave, true);
+
+  // Launch the first (i.e., failing) scheduler and wait until the
+  // first status update message is sent to it (drop the message).
+
+  MockScheduler sched1;
+  MesosSchedulerDriver driver1(&sched1, master);
+
+  FrameworkID frameworkId;
+  OfferID offerId;
+  vector<SlaveOffer> offers;
+
+  trigger resourceOfferCall, statusUpdateMsg;
+
+  EXPECT_CALL(sched1, getFrameworkName(&driver1))
+    .WillOnce(Return(""));
+
+  EXPECT_CALL(sched1, getExecutorInfo(&driver1))
+    .WillOnce(Return(DEFAULT_EXECUTOR_INFO));
+
+  EXPECT_CALL(sched1, registered(&driver1, _))
+    .WillOnce(SaveArg<1>(&frameworkId));
+
+  EXPECT_CALL(sched1, resourceOffer(&driver1, _, _))
+    .WillOnce(DoAll(SaveArg<1>(&offerId), SaveArg<2>(&offers),
+                    Trigger(&resourceOfferCall)))
+    .WillRepeatedly(Return());
+
+  EXPECT_CALL(sched1, statusUpdate(&driver1, _))
+    .Times(0);
+
+  EXPECT_CALL(sched1, error(&driver1, _, "Framework failover"))
+    .Times(1);
+
+  EXPECT_MSG(filter, Eq(StatusUpdateMessage().GetTypeName()), _,
+             Not(AnyOf(Eq(master), Eq(slave))))
+    .WillOnce(DoAll(Trigger(&statusUpdateMsg), Return(true)))
+    .RetiresOnSaturation();
+
+  driver1.start();
+
+  WAIT_UNTIL(resourceOfferCall);
+
+  EXPECT_NE(0, offers.size());
+
+  TaskDescription task;
+  task.set_name("");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->MergeFrom(offers[0].slave_id());
+  task.mutable_resources()->MergeFrom(offers[0].resources());
+
+  vector<TaskDescription> tasks;
+  tasks.push_back(task);
+
+  driver1.replyToOffer(offerId, tasks);
+
+  WAIT_UNTIL(statusUpdateMsg);
+
+  // Now launch the second (i.e., failover) scheduler using the
+  // framework id recorded from the first scheduler and wait until it
+  // registers, at which point advance time enough for the reliable
+  // timeout to kick in and another status update message is sent.
+
+  MockScheduler sched2;
+  MesosSchedulerDriver driver2(&sched2, master, frameworkId);
+
+  trigger registeredCall, statusUpdateCall;
+
+  EXPECT_CALL(sched2, getFrameworkName(&driver2))
+    .WillOnce(Return(""));
+
+  EXPECT_CALL(sched2, getExecutorInfo(&driver2))
+    .WillOnce(Return(DEFAULT_EXECUTOR_INFO));
+
+  EXPECT_CALL(sched2, registered(&driver2, frameworkId))
+    .WillOnce(Trigger(&registeredCall));
+
+  EXPECT_CALL(sched2, statusUpdate(&driver2, _))
+    .WillOnce(Trigger(&statusUpdateCall));
+
+  driver2.start();
+
+  WAIT_UNTIL(registeredCall);
+
+  process::Clock::advance(STATUS_UPDATE_RETRY_INTERVAL_SECONDS);
+
+  WAIT_UNTIL(statusUpdateCall);
+
+  driver1.stop();
+  driver2.stop();
+
+  driver1.join();
+  driver2.join();
+
+  process::post(slave, process::TERMINATE);
+  process::wait(slave);
+
+  process::post(master, process::TERMINATE);
+  process::wait(master);
+
+  process::filter(NULL);
+
+  process::Clock::resume();
+}
+
+
+TEST(MasterTest, SchedulerFailoverFrameworkMessage)
+{
+  ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+  Master m;
+  PID<Master> master = process::spawn(&m);
+
+  MockExecutor exec;
+
+  ExecutorDriver* execDriver;
+
+  EXPECT_CALL(exec, init(_, _))
+    .WillOnce(SaveArg<0>(&execDriver));
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdate(TASK_RUNNING));
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  map<ExecutorID, Executor*> execs;
+  execs[DEFAULT_EXECUTOR_ID] = &exec;
+
+  TestingIsolationModule isolationModule(execs);
+
+  Resources resources = Resources::parse("cpus:2;mem:1024");
+
+  Slave s(resources, true, &isolationModule);
+  PID<Slave> slave = process::spawn(&s);
+
+  BasicMasterDetector detector(master, slave, true);
+
+  MockScheduler sched1;
+  MesosSchedulerDriver driver1(&sched1, master);
+
+  FrameworkID frameworkId;
+  OfferID offerId;
+  vector<SlaveOffer> offers;
+  TaskStatus status;
+
+  trigger sched1ResourceOfferCall, sched1StatusUpdateCall;
+
+  EXPECT_CALL(sched1, getFrameworkName(&driver1))
+    .WillOnce(Return(""));
+
+  EXPECT_CALL(sched1, getExecutorInfo(&driver1))
+    .WillOnce(Return(DEFAULT_EXECUTOR_INFO));
+
+  EXPECT_CALL(sched1, registered(&driver1, _))
+    .WillOnce(SaveArg<1>(&frameworkId));
+
+  EXPECT_CALL(sched1, statusUpdate(&driver1, _))
+    .WillOnce(DoAll(SaveArg<1>(&status), Trigger(&sched1StatusUpdateCall)));
+
+  EXPECT_CALL(sched1, resourceOffer(&driver1, _, ElementsAre(_)))
+    .WillOnce(DoAll(SaveArg<1>(&offerId), SaveArg<2>(&offers),
+                    Trigger(&sched1ResourceOfferCall)))
+    .WillRepeatedly(Return());
+
+  EXPECT_CALL(sched1, error(&driver1, _, "Framework failover"))
+    .Times(1);
+
+  driver1.start();
+
+  WAIT_UNTIL(sched1ResourceOfferCall);
+
+  EXPECT_NE(0, offers.size());
+
+  TaskDescription task;
+  task.set_name("");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->MergeFrom(offers[0].slave_id());
+  task.mutable_resources()->MergeFrom(offers[0].resources());
+
+  vector<TaskDescription> tasks;
+  tasks.push_back(task);
+
+  driver1.replyToOffer(offerId, tasks);
+
+  WAIT_UNTIL(sched1StatusUpdateCall);
+
+  EXPECT_EQ(TASK_RUNNING, status.state());
+
+  MockScheduler sched2;
+  MesosSchedulerDriver driver2(&sched2, master, frameworkId);
+
+  trigger sched2RegisteredCall, sched2FrameworkMessageCall;
+
+  EXPECT_CALL(sched2, getFrameworkName(&driver2))
+    .WillOnce(Return(""));
+
+  EXPECT_CALL(sched2, getExecutorInfo(&driver2))
+    .WillOnce(Return(DEFAULT_EXECUTOR_INFO));
+
+  EXPECT_CALL(sched2, registered(&driver2, frameworkId))
+    .WillOnce(Trigger(&sched2RegisteredCall));
+
+  EXPECT_CALL(sched2, frameworkMessage(&driver2, _, _, _))
+    .WillOnce(Trigger(&sched2FrameworkMessageCall));
+
+  driver2.start();
+
+  WAIT_UNTIL(sched2RegisteredCall);
+
+  execDriver->sendFrameworkMessage("");
+
+  WAIT_UNTIL(sched2FrameworkMessageCall);
+
+  driver1.stop();
+  driver2.stop();
+
+  driver1.join();
+  driver2.join();
+
+  process::post(slave, process::TERMINATE);
+  process::wait(slave);
+
+  process::post(master, process::TERMINATE);
+  process::wait(master);
+}

Added: incubator/mesos/trunk/src/tests/lxc_isolation_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/lxc_isolation_tests.cpp?rev=1140024&view=auto
==============================================================================
--- incubator/mesos/trunk/src/tests/lxc_isolation_tests.cpp (added)
+++ incubator/mesos/trunk/src/tests/lxc_isolation_tests.cpp Mon Jun 27 06:08:33 2011
@@ -0,0 +1,13 @@
+#include <gtest/gtest.h>
+
+#include "tests/external_test.hpp"
+
+
+// Run a number of tests for the LXC isolation module.
+// These tests are disabled by default since they require alltests to be run
+// with sudo for Linux Container commands to be usable (and of course, they
+// also require a Linux version with LXC support).
+// You can enable them using ./alltests --gtest_also_run_disabled_tests.
+TEST_EXTERNAL(LxcIsolation, DISABLED_TwoSeparateTasks)
+TEST_EXTERNAL(LxcIsolation, DISABLED_ScaleUpAndDown)
+TEST_EXTERNAL(LxcIsolation, DISABLED_HoldMoreMemThanRequested)

Modified: incubator/mesos/trunk/src/tests/main.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/main.cpp?rev=1140024&r1=1140023&r2=1140024&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/main.cpp (original)
+++ incubator/mesos/trunk/src/tests/main.cpp Mon Jun 27 06:08:33 2011
@@ -51,6 +51,8 @@ int main(int argc, char** argv)
   // Get absolute path to Mesos home directory based on location of alltests
   mesos::internal::test::mesosHome = getMesosHome(argc, argv);
 
+  std::cout << "MESOS_HOME: " << mesos::internal::test::mesosHome << std::endl;
+
   // Clear any MESOS_ environment variables so they don't affect our tests
   Configurator::clearMesosEnvironmentVars();
 

Added: incubator/mesos/trunk/src/tests/master_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/master_tests.cpp?rev=1140024&view=auto
==============================================================================
--- incubator/mesos/trunk/src/tests/master_tests.cpp (added)
+++ incubator/mesos/trunk/src/tests/master_tests.cpp Mon Jun 27 06:08:33 2011
@@ -0,0 +1,485 @@
+#include <gmock/gmock.h>
+
+#include <mesos/executor.hpp>
+#include <mesos/scheduler.hpp>
+
+#include "detector/detector.hpp"
+
+#include "local/local.hpp"
+
+#include "master/master.hpp"
+
+#include "slave/slave.hpp"
+
+#include "tests/utils.hpp"
+
+using namespace mesos;
+using namespace mesos::internal;
+using namespace mesos::internal::test;
+
+using mesos::internal::master::Master;
+
+using mesos::internal::slave::Slave;
+
+using process::PID;
+
+using std::string;
+using std::map;
+using std::vector;
+
+using testing::_;
+using testing::AtMost;
+using testing::DoAll;
+using testing::Eq;
+using testing::Return;
+using testing::SaveArg;
+
+
+TEST(MasterTest, TaskRunning)
+{
+  ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+  Master m;
+  PID<Master> master = process::spawn(&m);
+
+  MockExecutor exec;
+
+  EXPECT_CALL(exec, init(_, _))
+    .Times(1);
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdate(TASK_RUNNING));
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  map<ExecutorID, Executor*> execs;
+  execs[DEFAULT_EXECUTOR_ID] = &exec;
+
+  TestingIsolationModule isolationModule(execs);
+
+  Resources resources = Resources::parse("cpus:2;mem:1024");
+
+  Slave s(resources, true, &isolationModule);
+  PID<Slave> slave = process::spawn(&s);
+
+  BasicMasterDetector detector(master, slave, true);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(&sched, master);
+
+  OfferID offerId;
+  vector<SlaveOffer> offers;
+  TaskStatus status;
+
+  trigger resourceOfferCall, statusUpdateCall;
+
+  EXPECT_CALL(sched, getFrameworkName(&driver))
+    .WillOnce(Return(""));
+
+  EXPECT_CALL(sched, getExecutorInfo(&driver))
+    .WillOnce(Return(DEFAULT_EXECUTOR_INFO));
+
+  EXPECT_CALL(sched, registered(&driver, _))
+    .Times(1);
+
+  EXPECT_CALL(sched, resourceOffer(&driver, _, _))
+    .WillOnce(DoAll(SaveArg<1>(&offerId), SaveArg<2>(&offers),
+                    Trigger(&resourceOfferCall)))
+    .WillRepeatedly(Return());
+
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(DoAll(SaveArg<1>(&status), Trigger(&statusUpdateCall)));
+
+  driver.start();
+
+  WAIT_UNTIL(resourceOfferCall);
+
+  EXPECT_NE(0, offers.size());
+
+  TaskDescription task;
+  task.set_name("");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->MergeFrom(offers[0].slave_id());
+  task.mutable_resources()->MergeFrom(offers[0].resources());
+
+  vector<TaskDescription> tasks;
+  tasks.push_back(task);
+
+  driver.replyToOffer(offerId, tasks);
+
+  WAIT_UNTIL(statusUpdateCall);
+
+  EXPECT_EQ(TASK_RUNNING, status.state());
+
+  driver.stop();
+  driver.join();
+
+  process::post(slave, process::TERMINATE);
+  process::wait(slave);
+
+  process::post(master, process::TERMINATE);
+  process::wait(master);
+}
+
+
+TEST(MasterTest, KillTask)
+{
+  ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+  Master m;
+  PID<Master> master = process::spawn(&m);
+
+  MockExecutor exec;
+
+  trigger killTaskCall;
+
+  EXPECT_CALL(exec, init(_, _))
+    .Times(1);
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdate(TASK_RUNNING));
+
+  EXPECT_CALL(exec, killTask(_, _))
+    .WillOnce(Trigger(&killTaskCall));
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  map<ExecutorID, Executor*> execs;
+  execs[DEFAULT_EXECUTOR_ID] = &exec;
+
+  TestingIsolationModule isolationModule(execs);
+
+  Resources resources = Resources::parse("cpus:2;mem:1024");
+
+  Slave s(resources, true, &isolationModule);
+  PID<Slave> slave = process::spawn(&s);
+
+  BasicMasterDetector detector(master, slave, true);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(&sched, master);
+
+  OfferID offerId;
+  vector<SlaveOffer> offers;
+  TaskStatus status;
+
+  trigger resourceOfferCall, statusUpdateCall;
+
+  EXPECT_CALL(sched, getFrameworkName(&driver))
+    .WillOnce(Return(""));
+
+  EXPECT_CALL(sched, getExecutorInfo(&driver))
+    .WillOnce(Return(DEFAULT_EXECUTOR_INFO));
+
+  EXPECT_CALL(sched, registered(&driver, _))
+    .Times(1);
+
+  EXPECT_CALL(sched, resourceOffer(&driver, _, _))
+    .WillOnce(DoAll(SaveArg<1>(&offerId), SaveArg<2>(&offers),
+                    Trigger(&resourceOfferCall)))
+    .WillRepeatedly(Return());
+
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(DoAll(SaveArg<1>(&status), Trigger(&statusUpdateCall)));
+
+  driver.start();
+
+  WAIT_UNTIL(resourceOfferCall);
+
+  EXPECT_NE(0, offers.size());
+
+  TaskID taskId;
+  taskId.set_value("1");
+
+  TaskDescription task;
+  task.set_name("");
+  task.mutable_task_id()->MergeFrom(taskId);
+  task.mutable_slave_id()->MergeFrom(offers[0].slave_id());
+  task.mutable_resources()->MergeFrom(offers[0].resources());
+
+  vector<TaskDescription> tasks;
+  tasks.push_back(task);
+
+  driver.replyToOffer(offerId, tasks);
+
+  WAIT_UNTIL(statusUpdateCall);
+
+  EXPECT_EQ(TASK_RUNNING, status.state());
+
+  driver.killTask(taskId);
+
+  WAIT_UNTIL(killTaskCall);
+
+  driver.stop();
+  driver.join();
+
+  process::post(slave, process::TERMINATE);
+  process::wait(slave);
+
+  process::post(master, process::TERMINATE);
+  process::wait(master);
+}
+
+
+TEST(MasterTest, FrameworkMessage)
+{
+  ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+  Master m;
+  PID<Master> master = process::spawn(&m);
+
+  MockExecutor exec;
+
+  ExecutorDriver* execDriver;
+  ExecutorArgs args;
+  string execData;
+
+  trigger execFrameworkMessageCall;
+
+  EXPECT_CALL(exec, init(_, _))
+    .WillOnce(DoAll(SaveArg<0>(&execDriver), SaveArg<1>(&args)));
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdate(TASK_RUNNING));
+
+  EXPECT_CALL(exec, frameworkMessage(_, _))
+    .WillOnce(DoAll(SaveArg<1>(&execData),
+                    Trigger(&execFrameworkMessageCall)));
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  map<ExecutorID, Executor*> execs;
+  execs[DEFAULT_EXECUTOR_ID] = &exec;
+
+  TestingIsolationModule isolationModule(execs);
+
+  Resources resources = Resources::parse("cpus:2;mem:1024");
+
+  Slave s(resources, true, &isolationModule);
+  PID<Slave> slave = process::spawn(&s);
+
+  BasicMasterDetector detector(master, slave, true);
+
+  // Launch the first (i.e., failing) scheduler and wait until the
+  // first status update message is sent to it (drop the message).
+
+  MockScheduler sched;
+  MesosSchedulerDriver schedDriver(&sched, master);
+
+  OfferID offerId;
+  vector<SlaveOffer> offers;
+  TaskStatus status;
+  string schedData;
+
+  trigger resourceOfferCall, statusUpdateCall, schedFrameworkMessageCall;
+
+  EXPECT_CALL(sched, getFrameworkName(&schedDriver))
+    .WillOnce(Return(""));
+
+  EXPECT_CALL(sched, getExecutorInfo(&schedDriver))
+    .WillOnce(Return(DEFAULT_EXECUTOR_INFO));
+
+  EXPECT_CALL(sched, registered(&schedDriver, _))
+    .Times(1);
+
+  EXPECT_CALL(sched, resourceOffer(&schedDriver, _, _))
+    .WillOnce(DoAll(SaveArg<1>(&offerId), SaveArg<2>(&offers),
+                    Trigger(&resourceOfferCall)))
+    .WillRepeatedly(Return());
+
+  EXPECT_CALL(sched, statusUpdate(&schedDriver, _))
+    .WillOnce(DoAll(SaveArg<1>(&status), Trigger(&statusUpdateCall)));
+
+  EXPECT_CALL(sched, frameworkMessage(&schedDriver, _, _, _))
+    .WillOnce(DoAll(SaveArg<3>(&schedData),
+                    Trigger(&schedFrameworkMessageCall)));
+
+  schedDriver.start();
+
+  WAIT_UNTIL(resourceOfferCall);
+
+  EXPECT_NE(0, offers.size());
+
+  TaskDescription task;
+  task.set_name("");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->MergeFrom(offers[0].slave_id());
+  task.mutable_resources()->MergeFrom(offers[0].resources());
+
+  vector<TaskDescription> tasks;
+  tasks.push_back(task);
+
+  schedDriver.replyToOffer(offerId, tasks);
+
+  WAIT_UNTIL(statusUpdateCall);
+
+  EXPECT_EQ(TASK_RUNNING, status.state());
+
+  string hello = "hello";
+
+  schedDriver.sendFrameworkMessage(offers[0].slave_id(),
+				   DEFAULT_EXECUTOR_ID,
+				   hello);
+
+  WAIT_UNTIL(execFrameworkMessageCall);
+
+  EXPECT_EQ(hello, execData);
+
+  string reply = "reply";
+
+  execDriver->sendFrameworkMessage(reply);
+
+  WAIT_UNTIL(schedFrameworkMessageCall);
+
+  EXPECT_EQ(reply, schedData);
+
+  schedDriver.stop();
+  schedDriver.join();
+
+  process::post(slave, process::TERMINATE);
+  process::wait(slave);
+
+  process::post(master, process::TERMINATE);
+  process::wait(master);
+}
+
+
+TEST(MasterTest, MultipleExecutors)
+{
+  ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+  Master m;
+  PID<Master> master = process::spawn(&m);
+
+  MockExecutor exec1;
+  TaskDescription exec1Task;
+  trigger exec1LaunchTaskCall;
+
+  EXPECT_CALL(exec1, init(_, _))
+    .Times(1);
+
+  EXPECT_CALL(exec1, launchTask(_, _))
+    .WillOnce(DoAll(SaveArg<1>(&exec1Task),
+                    Trigger(&exec1LaunchTaskCall),
+                    SendStatusUpdate(TASK_RUNNING)));
+
+  EXPECT_CALL(exec1, shutdown(_))
+    .Times(AtMost(1));
+
+  MockExecutor exec2;
+  TaskDescription exec2Task;
+  trigger exec2LaunchTaskCall;
+
+  EXPECT_CALL(exec2, init(_, _))
+    .Times(1);
+
+  EXPECT_CALL(exec2, launchTask(_, _))
+    .WillOnce(DoAll(SaveArg<1>(&exec2Task),
+                    Trigger(&exec2LaunchTaskCall),
+                    SendStatusUpdate(TASK_RUNNING)));
+
+  EXPECT_CALL(exec2, shutdown(_))
+    .Times(AtMost(1));
+
+  ExecutorID executorId1;
+  executorId1.set_value("executor-1");
+
+  ExecutorID executorId2;
+  executorId2.set_value("executor-2");
+
+  map<ExecutorID, Executor*> execs;
+  execs[executorId1] = &exec1;
+  execs[executorId2] = &exec2;
+
+  TestingIsolationModule isolationModule(execs);
+
+  Resources resources = Resources::parse("cpus:2;mem:1024");
+
+  Slave s(resources, true, &isolationModule);
+  PID<Slave> slave = process::spawn(&s);
+
+  BasicMasterDetector detector(master, slave, true);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(&sched, master);
+
+  OfferID offerId;
+  vector<SlaveOffer> offers;
+  TaskStatus status1, status2;
+
+  trigger resourceOfferCall, statusUpdateCall1, statusUpdateCall2;
+
+  EXPECT_CALL(sched, getFrameworkName(&driver))
+    .WillOnce(Return(""));
+
+  EXPECT_CALL(sched, getExecutorInfo(&driver))
+    .WillOnce(Return(DEFAULT_EXECUTOR_INFO));
+
+  EXPECT_CALL(sched, registered(&driver, _))
+    .Times(1);
+
+  EXPECT_CALL(sched, resourceOffer(&driver, _, _))
+    .WillOnce(DoAll(SaveArg<1>(&offerId), SaveArg<2>(&offers),
+                    Trigger(&resourceOfferCall)))
+    .WillRepeatedly(Return());
+
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(DoAll(SaveArg<1>(&status1), Trigger(&statusUpdateCall1)))
+    .WillOnce(DoAll(SaveArg<1>(&status2), Trigger(&statusUpdateCall2)));
+
+  driver.start();
+
+  WAIT_UNTIL(resourceOfferCall);
+
+  ASSERT_NE(0, offers.size());
+
+  TaskDescription task1;
+  task1.set_name("");
+  task1.mutable_task_id()->set_value("1");
+  task1.mutable_slave_id()->MergeFrom(offers[0].slave_id());
+  task1.mutable_resources()->MergeFrom(Resources::parse("cpus:1;mem:512"));
+  task1.mutable_executor()->mutable_executor_id()->MergeFrom(executorId1);
+  task1.mutable_executor()->set_uri("noexecutor");
+
+  TaskDescription task2;
+  task2.set_name("");
+  task2.mutable_task_id()->set_value("2");
+  task2.mutable_slave_id()->MergeFrom(offers[0].slave_id());
+  task2.mutable_resources()->MergeFrom(Resources::parse("cpus:1;mem:512"));
+  task2.mutable_executor()->mutable_executor_id()->MergeFrom(executorId2);
+  task2.mutable_executor()->set_uri("noexecutor");
+
+  vector<TaskDescription> tasks;
+  tasks.push_back(task1);
+  tasks.push_back(task2);
+
+  driver.replyToOffer(offerId, tasks);
+
+  WAIT_UNTIL(statusUpdateCall1);
+
+  EXPECT_EQ(TASK_RUNNING, status1.state());
+
+  WAIT_UNTIL(statusUpdateCall2);
+
+  EXPECT_EQ(TASK_RUNNING, status2.state());
+
+  WAIT_UNTIL(exec1LaunchTaskCall);
+
+  EXPECT_EQ(task1.task_id(), exec1Task.task_id());
+
+  WAIT_UNTIL(exec2LaunchTaskCall);
+
+  EXPECT_EQ(task2.task_id(), exec2Task.task_id());
+
+  driver.stop();
+  driver.join();
+
+  process::post(slave, process::TERMINATE);
+  process::wait(slave);
+
+  process::post(master, process::TERMINATE);
+  process::wait(master);
+}

Added: incubator/mesos/trunk/src/tests/multimap_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/multimap_tests.cpp?rev=1140024&view=auto
==============================================================================
--- incubator/mesos/trunk/src/tests/multimap_tests.cpp (added)
+++ incubator/mesos/trunk/src/tests/multimap_tests.cpp Mon Jun 27 06:08:33 2011
@@ -0,0 +1,126 @@
+#include <gtest/gtest.h>
+
+#include <stdint.h>
+
+#include <string>
+
+#include "common/foreach.hpp"
+#include "common/multimap.hpp"
+
+using namespace mesos;
+using namespace mesos::internal;
+
+using std::string;
+
+
+TEST(Multimap, Insert)
+{
+  multimap<string, uint16_t> map;
+
+  map.insert("foo", 1024);
+  ASSERT_EQ(1, map.count("foo"));
+
+  map.insert("foo", 1025);
+  ASSERT_EQ(2, map.count("foo"));
+
+  ASSERT_EQ(1, map.size());
+
+  map.insert("bar", 1024);
+  ASSERT_EQ(1, map.count("bar"));
+
+  map.insert("bar", 1025);
+  ASSERT_EQ(2, map.count("bar"));
+
+  ASSERT_EQ(2, map.size());
+}
+
+
+TEST(Multimap, Erase)
+{
+  multimap<string, uint16_t> map;
+
+  map.insert("foo", 1024);
+  map.erase("foo", 1024);
+  ASSERT_EQ(0, map.count("foo"));
+
+  ASSERT_EQ(0, map.size());
+
+  map.insert("foo", 1024);
+  map.insert("foo", 1025);
+  ASSERT_EQ(2, map.count("foo"));
+
+  ASSERT_EQ(1, map.size());
+
+  map.erase("foo");
+  ASSERT_EQ(0, map.count("foo"));
+  ASSERT_EQ(0, map.size());
+}
+
+
+TEST(Multimap, Count)
+{
+  multimap<string, uint16_t> map;
+
+  map.insert("foo", 1024);
+  map.insert("foo", 1025);
+  ASSERT_EQ(2, map.count("foo"));
+  ASSERT_EQ(1, map.count("foo", 1024));
+  ASSERT_EQ(1, map.count("foo", 1025));
+
+  map.insert("bar", 1024);
+  map.insert("bar", 1025);
+  ASSERT_EQ(2, map.count("bar"));
+  ASSERT_EQ(1, map.count("bar", 1024));
+  ASSERT_EQ(1, map.count("bar", 1025));
+}
+
+
+TEST(Multimap, Iterator)
+{
+  multimap<string, uint16_t> map;
+
+  map.insert("foo", 1024);
+  map.insert("foo", 1025);
+  ASSERT_EQ(2, map.count("foo"));
+  ASSERT_EQ(1, map.count("foo", 1024));
+  ASSERT_EQ(1, map.count("foo", 1025));
+
+  multimap<string, uint16_t>::iterator i = map.begin();
+
+  ASSERT_TRUE(i != map.end());
+
+  ASSERT_EQ("foo", i->first);
+  ASSERT_EQ(1024, i->second);
+
+  ++i;
+  ASSERT_TRUE(i != map.end());
+
+  ASSERT_EQ("foo", i->first);
+  ASSERT_EQ(1025, i->second);
+
+  ++i;
+  ASSERT_TRUE(i == map.end());
+}
+
+
+TEST(Multimap, Foreach)
+{
+  multimap<string, uint16_t> map;
+
+  map.insert("foo", 1024);
+  map.insert("bar", 1025);
+  ASSERT_EQ(1, map.count("foo"));
+  ASSERT_EQ(1, map.count("foo"));
+  ASSERT_EQ(1, map.count("foo", 1024));
+  ASSERT_EQ(1, map.count("bar", 1025));
+
+  foreachpair (const string& key, uint16_t value, map) {
+    if (key == "foo") {
+      ASSERT_EQ(1024, value);
+    } else if (key == "bar") {
+      ASSERT_EQ(1025, value);
+    } else {
+      FAIL() << "Unexpected key/value in multimap";
+    }
+  }
+}

Added: incubator/mesos/trunk/src/tests/resource_offer_reply_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/resource_offer_reply_tests.cpp?rev=1140024&view=auto
==============================================================================
--- incubator/mesos/trunk/src/tests/resource_offer_reply_tests.cpp (added)
+++ incubator/mesos/trunk/src/tests/resource_offer_reply_tests.cpp Mon Jun 27 06:08:33 2011
@@ -0,0 +1,365 @@
+#include <gmock/gmock.h>
+
+#include <mesos/executor.hpp>
+#include <mesos/scheduler.hpp>
+
+#include "common/date_utils.hpp"
+
+#include "local/local.hpp"
+
+#include "master/master.hpp"
+
+#include "slave/isolation_module.hpp"
+#include "slave/process_based_isolation_module.hpp"
+#include "slave/slave.hpp"
+
+#include "tests/utils.hpp"
+
+using namespace mesos;
+using namespace mesos::internal;
+
+using mesos::internal::master::Master;
+
+using mesos::internal::slave::Slave;
+using mesos::internal::slave::Framework;
+
+using process::PID;
+
+using std::string;
+using std::vector;
+
+
+/**
+ * These tests aren't using gmock right now, but at some point we
+ * might move them in that direction.
+ */
+class FixedResponseScheduler : public Scheduler
+{
+public:
+  vector<TaskDescription> response;
+  string errorMessage;
+  
+  FixedResponseScheduler(const vector<TaskDescription>& _response)
+    : response(_response) {}
+
+  virtual ~FixedResponseScheduler() {}
+
+  virtual string getFrameworkName(SchedulerDriver*)
+  {
+    return "Fixed Response Framework";
+  }
+
+  virtual ExecutorInfo getExecutorInfo(SchedulerDriver*) {
+    // TODO(benh): The following line crashes some Linux compilers. :(
+    // return DEFAULT_EXECUTOR_INFO;
+    ExecutorInfo executor;
+    executor.mutable_executor_id()->set_value("default");
+    executor.set_uri("noexecutor");
+    return executor;
+  }
+
+  virtual void registered(SchedulerDriver*, const FrameworkID&) {}
+
+  virtual void resourceOffer(SchedulerDriver* driver,
+                             const OfferID& offerId,
+                             const vector<SlaveOffer>& offers) {
+    LOG(INFO) << "FixedResponseScheduler got a slot offer";
+
+    driver->replyToOffer(offerId, response);
+  }
+
+  virtual void offerRescinded(SchedulerDriver* driver,
+                              const OfferID& offerId) {}
+
+  virtual void statusUpdate(SchedulerDriver* driver,
+                            const TaskStatus& status) {}
+
+  virtual void frameworkMessage(SchedulerDriver* driver,
+				const SlaveID& slaveId,
+				const ExecutorID& executorId,
+                                const string& data) {}
+
+  virtual void slaveLost(SchedulerDriver* driver, const SlaveID& sid) {}
+
+  virtual void error(SchedulerDriver* driver, int code, const string& message) {
+    errorMessage = message;
+    driver->stop();
+  }
+};
+
+
+TEST(MasterTest, DuplicateTaskIdsInResponse)
+{
+  ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+  DateUtils::setMockDate("200102030405");
+  PID<Master> master = local::launch(1, 3, 3 * Gigabyte, false, false);
+
+  Resources resources;
+
+  Resource cpus;
+  cpus.set_name("cpus");
+  cpus.set_type(Resource::SCALAR);
+  cpus.mutable_scalar()->set_value(1);
+
+  Resource mem;
+  mem.set_name("mem");
+  mem.set_type(Resource::SCALAR);
+  mem.mutable_scalar()->set_value(1 * Gigabyte);
+
+  resources += cpus;
+  resources += mem;
+
+  vector<TaskDescription> tasks;
+
+  TaskDescription task;
+  task.set_name("");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->set_value("200102030405-0-0");
+  task.mutable_resources()->MergeFrom(resources);
+
+  tasks.push_back(task);
+  tasks.push_back(task);
+
+  task.mutable_task_id()->set_value("2");
+
+  tasks.push_back(task);
+
+  FixedResponseScheduler sched(tasks);
+  MesosSchedulerDriver driver(&sched, master);
+
+  driver.run();
+
+  EXPECT_EQ("Duplicate task ID: 1", sched.errorMessage);
+
+  local::shutdown();
+  DateUtils::clearMockDate();
+}
+
+
+TEST(MasterTest, TooMuchMemoryInTask)
+{
+  ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+  DateUtils::setMockDate("200102030405");
+  PID<Master> master = local::launch(1, 3, 3 * Gigabyte, false, false);
+
+  Resources resources;
+
+  Resource cpus;
+  cpus.set_name("cpus");
+  cpus.set_type(Resource::SCALAR);
+  cpus.mutable_scalar()->set_value(1);
+
+  Resource mem;
+  mem.set_name("mem");
+  mem.set_type(Resource::SCALAR);
+  mem.mutable_scalar()->set_value(4 * Gigabyte);
+
+  resources += cpus;
+  resources += mem;
+
+  vector<TaskDescription> tasks;
+
+  TaskDescription task;
+  task.set_name("");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->set_value("200102030405-0-0");
+  task.mutable_resources()->MergeFrom(resources);
+
+  tasks.push_back(task);
+
+  FixedResponseScheduler sched(tasks);
+  MesosSchedulerDriver driver(&sched, master);
+
+  driver.run();
+
+  EXPECT_EQ("Too many resources accepted", sched.errorMessage);
+
+  local::shutdown();
+  DateUtils::clearMockDate();
+}
+
+
+TEST(MasterTest, TooMuchCpuInTask)
+{
+  ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+  DateUtils::setMockDate("200102030405");
+  PID<Master> master = local::launch(1, 3, 3 * Gigabyte, false, false);
+
+  Resources resources;
+
+  Resource cpus;
+  cpus.set_name("cpus");
+  cpus.set_type(Resource::SCALAR);
+  cpus.mutable_scalar()->set_value(4);
+
+  Resource mem;
+  mem.set_name("mem");
+  mem.set_type(Resource::SCALAR);
+  mem.mutable_scalar()->set_value(1 * Gigabyte);
+
+  resources += cpus;
+  resources += mem;
+
+  vector<TaskDescription> tasks;
+
+  TaskDescription task;
+  task.set_name("");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->set_value("200102030405-0-0");
+  task.mutable_resources()->MergeFrom(resources);
+
+  tasks.push_back(task);
+
+  FixedResponseScheduler sched(tasks);
+  MesosSchedulerDriver driver(&sched, master);
+
+  driver.run();
+
+  EXPECT_EQ("Too many resources accepted", sched.errorMessage);
+
+  local::shutdown();
+  DateUtils::clearMockDate();
+}
+
+
+TEST(MasterTest, ZeroCpuInTask)
+{
+  ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+  DateUtils::setMockDate("200102030405");
+  PID<Master> master = local::launch(1, 3, 3 * Gigabyte, false, false);
+
+  Resources resources;
+
+  Resource cpus;
+  cpus.set_name("cpus");
+  cpus.set_type(Resource::SCALAR);
+  cpus.mutable_scalar()->set_value(0);
+
+  Resource mem;
+  mem.set_name("mem");
+  mem.set_type(Resource::SCALAR);
+  mem.mutable_scalar()->set_value(1 * Gigabyte);
+
+  resources += cpus;
+  resources += mem;
+
+  vector<TaskDescription> tasks;
+
+  TaskDescription task;
+  task.set_name("");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->set_value("200102030405-0-0");
+  task.mutable_resources()->MergeFrom(resources);
+
+  tasks.push_back(task);
+
+  FixedResponseScheduler sched(tasks);
+  MesosSchedulerDriver driver(&sched, master);
+
+  driver.run();
+
+  EXPECT_EQ("Invalid resources for task", sched.errorMessage);
+
+  local::shutdown();
+  DateUtils::clearMockDate();
+}
+
+
+TEST(MasterTest, TooMuchMemoryAcrossTasks)
+{
+  ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+  DateUtils::setMockDate("200102030405");
+  PID<Master> master = local::launch(1, 3, 3 * Gigabyte, false, false);
+
+  Resources resources;
+
+  Resource cpus;
+  cpus.set_name("cpus");
+  cpus.set_type(Resource::SCALAR);
+  cpus.mutable_scalar()->set_value(1);
+
+  Resource mem;
+  mem.set_name("mem");
+  mem.set_type(Resource::SCALAR);
+  mem.mutable_scalar()->set_value(2 * Gigabyte);
+
+  resources += cpus;
+  resources += mem;
+
+  vector<TaskDescription> tasks;
+
+  TaskDescription task;
+  task.set_name("");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->set_value("200102030405-0-0");
+  task.mutable_resources()->MergeFrom(resources);
+
+  tasks.push_back(task);
+
+  task.mutable_task_id()->set_value("2");
+
+  tasks.push_back(task);
+
+  FixedResponseScheduler sched(tasks);
+  MesosSchedulerDriver driver(&sched, master);
+
+  driver.run();
+
+  EXPECT_EQ("Too many resources accepted", sched.errorMessage);
+
+  local::shutdown();
+  DateUtils::clearMockDate();
+}
+
+
+TEST(MasterTest, TooMuchCpuAcrossTasks)
+{
+  ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+  DateUtils::setMockDate("200102030405");
+  PID<Master> master = local::launch(1, 3, 3 * Gigabyte, false, false);
+
+  Resources resources;
+
+  Resource cpus;
+  cpus.set_name("cpus");
+  cpus.set_type(Resource::SCALAR);
+  cpus.mutable_scalar()->set_value(2);
+
+  Resource mem;
+  mem.set_name("mem");
+  mem.set_type(Resource::SCALAR);
+  mem.mutable_scalar()->set_value(1 * Gigabyte);
+
+  resources += cpus;
+  resources += mem;
+
+  vector<TaskDescription> tasks;
+
+  TaskDescription task;
+  task.set_name("");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->set_value("200102030405-0-0");
+  task.mutable_resources()->MergeFrom(resources);
+
+  tasks.push_back(task);
+
+  task.mutable_task_id()->set_value("2");
+
+  tasks.push_back(task);
+
+  FixedResponseScheduler sched(tasks);
+  MesosSchedulerDriver driver(&sched, master);
+
+  driver.run();
+
+  EXPECT_EQ("Too many resources accepted", sched.errorMessage);
+
+  local::shutdown();
+  DateUtils::clearMockDate();
+}

Added: incubator/mesos/trunk/src/tests/resource_offer_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/resource_offer_tests.cpp?rev=1140024&view=auto
==============================================================================
--- incubator/mesos/trunk/src/tests/resource_offer_tests.cpp (added)
+++ incubator/mesos/trunk/src/tests/resource_offer_tests.cpp Mon Jun 27 06:08:33 2011
@@ -0,0 +1,246 @@
+#include <gmock/gmock.h>
+
+#include <mesos/executor.hpp>
+#include <mesos/scheduler.hpp>
+
+#include "local/local.hpp"
+
+#include "master/master.hpp"
+
+#include "slave/slave.hpp"
+
+#include "tests/utils.hpp"
+
+using namespace mesos;
+using namespace mesos::internal;
+using namespace mesos::internal::test;
+
+using mesos::internal::master::Master;
+
+using mesos::internal::slave::Slave;
+
+using process::PID;
+
+using std::string;
+using std::vector;
+
+using testing::_;
+using testing::AtMost;
+using testing::DoAll;
+using testing::ElementsAre;
+using testing::Return;
+using testing::SaveArg;
+
+
+TEST(MasterTest, ResourceOfferWithMultipleSlaves)
+{
+  ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+  PID<Master> master = local::launch(10, 2, 1 * Gigabyte, false, false);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(&sched, master);
+
+  vector<SlaveOffer> offers;
+
+  trigger resourceOfferCall;
+
+  EXPECT_CALL(sched, getFrameworkName(&driver))
+    .WillOnce(Return(""));
+
+  EXPECT_CALL(sched, getExecutorInfo(&driver))
+    .WillOnce(Return(DEFAULT_EXECUTOR_INFO));
+
+  EXPECT_CALL(sched, registered(&driver, _))
+    .Times(1);
+
+  EXPECT_CALL(sched, resourceOffer(&driver, _, _))
+    .WillOnce(DoAll(SaveArg<2>(&offers), Trigger(&resourceOfferCall)))
+    .WillRepeatedly(Return());
+
+  EXPECT_CALL(sched, offerRescinded(&driver, _))
+    .Times(AtMost(1));
+
+  driver.start();
+
+  WAIT_UNTIL(resourceOfferCall);
+
+  EXPECT_NE(0, offers.size());
+  EXPECT_GE(10, offers.size());
+
+  Resources resources(offers[0].resources());
+  EXPECT_EQ(2, resources.getScalar("cpus", Resource::Scalar()).value());
+  EXPECT_EQ(1024, resources.getScalar("mem", Resource::Scalar()).value());
+
+  driver.stop();
+  driver.join();
+
+  local::shutdown();
+}
+
+
+TEST(MasterTest, ResourcesReofferedAfterReject)
+{
+  ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+  PID<Master> master = local::launch(1, 2, 1 * Gigabyte, false, false);
+
+  MockScheduler sched1;
+  MesosSchedulerDriver driver1(&sched1, master);
+
+  OfferID offerId;
+
+  trigger sched1ResourceOfferCall;
+
+  EXPECT_CALL(sched1, getFrameworkName(&driver1))
+    .WillOnce(Return(""));
+
+  EXPECT_CALL(sched1, getExecutorInfo(&driver1))
+    .WillOnce(Return(DEFAULT_EXECUTOR_INFO));
+
+  EXPECT_CALL(sched1, registered(&driver1, _))
+    .Times(1);
+
+  EXPECT_CALL(sched1, resourceOffer(&driver1, _, _))
+    .WillOnce(DoAll(SaveArg<1>(&offerId), Trigger(&sched1ResourceOfferCall)))
+    .WillRepeatedly(Return());
+
+  driver1.start();
+
+  WAIT_UNTIL(sched1ResourceOfferCall);
+
+  driver1.replyToOffer(offerId, vector<TaskDescription>());
+
+  driver1.stop();
+  driver1.join();
+
+  MockScheduler sched2;
+  MesosSchedulerDriver driver2(&sched2, master);
+
+  trigger sched2ResourceOfferCall;
+
+  EXPECT_CALL(sched2, getFrameworkName(&driver2))
+    .WillOnce(Return(""));
+
+  EXPECT_CALL(sched2, getExecutorInfo(&driver2))
+    .WillOnce(Return(DEFAULT_EXECUTOR_INFO));
+
+  EXPECT_CALL(sched2, registered(&driver2, _))
+    .Times(1);
+
+  EXPECT_CALL(sched2, resourceOffer(&driver2, _, _))
+    .WillOnce(Trigger(&sched2ResourceOfferCall))
+    .WillRepeatedly(Return());
+
+  EXPECT_CALL(sched2, offerRescinded(&driver2, _))
+    .Times(AtMost(1));
+
+  driver2.start();
+
+  WAIT_UNTIL(sched2ResourceOfferCall);
+
+  driver2.stop();
+  driver2.join();
+
+  local::shutdown();
+}
+
+
+TEST(MasterTest, ResourcesReofferedAfterBadResponse)
+{
+  ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+  PID<Master> master = local::launch(1, 2, 1 * Gigabyte, false, false);
+
+  MockScheduler sched1;
+  MesosSchedulerDriver driver1(&sched1, master);
+
+  OfferID offerId;
+  vector<SlaveOffer> offers;
+
+  trigger sched1ResourceOfferCall;
+
+  EXPECT_CALL(sched1, getFrameworkName(&driver1))
+    .WillOnce(Return(""));
+
+  EXPECT_CALL(sched1, getExecutorInfo(&driver1))
+    .WillOnce(Return(DEFAULT_EXECUTOR_INFO));
+
+  EXPECT_CALL(sched1, registered(&driver1, _))
+    .Times(1);
+
+  EXPECT_CALL(sched1, resourceOffer(&driver1, _, ElementsAre(_)))
+    .WillOnce(DoAll(SaveArg<1>(&offerId), SaveArg<2>(&offers),
+                    Trigger(&sched1ResourceOfferCall)))
+    .WillRepeatedly(Return());
+
+  driver1.start();
+
+  WAIT_UNTIL(sched1ResourceOfferCall);
+
+  EXPECT_NE(0, offers.size());
+
+  TaskDescription task;
+  task.set_name("");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->MergeFrom(offers[0].slave_id());
+
+  Resource* cpus = task.add_resources();
+  cpus->set_name("cpus");
+  cpus->set_type(Resource::SCALAR);
+  cpus->mutable_scalar()->set_value(0);
+
+  Resource* mem = task.add_resources();
+  mem->set_name("mem");
+  mem->set_type(Resource::SCALAR);
+  mem->mutable_scalar()->set_value(1 * Gigabyte);
+
+  vector<TaskDescription> tasks;
+  tasks.push_back(task);
+
+  trigger sched1ErrorCall;
+
+  EXPECT_CALL(sched1,
+              error(&driver1, _, "Invalid resources for task"))
+    .WillOnce(Trigger(&sched1ErrorCall));
+
+  EXPECT_CALL(sched1, offerRescinded(&driver1, offerId))
+    .Times(AtMost(1));
+
+  driver1.replyToOffer(offerId, tasks);
+
+  WAIT_UNTIL(sched1ErrorCall);
+
+  driver1.stop();
+  driver1.join();
+
+  MockScheduler sched2;
+  MesosSchedulerDriver driver2(&sched2, master);
+
+  trigger sched2ResourceOfferCall;
+
+  EXPECT_CALL(sched2, getFrameworkName(&driver2))
+    .WillOnce(Return(""));
+
+  EXPECT_CALL(sched2, getExecutorInfo(&driver2))
+    .WillOnce(Return(DEFAULT_EXECUTOR_INFO));
+
+  EXPECT_CALL(sched2, registered(&driver2, _))
+    .Times(1);
+
+  EXPECT_CALL(sched2, resourceOffer(&driver2, _, _))
+    .WillOnce(Trigger(&sched2ResourceOfferCall))
+    .WillRepeatedly(Return());
+
+  EXPECT_CALL(sched2, offerRescinded(&driver2, _))
+    .Times(AtMost(1));
+
+  driver2.start();
+
+  WAIT_UNTIL(sched2ResourceOfferCall);
+
+  driver2.stop();
+  driver2.join();
+
+  local::shutdown();
+}