You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/06/04 00:47:55 UTC

[1/2] git commit: Introduced a low-level scheduler API.

Repository: mesos
Updated Branches:
  refs/heads/master b4ec4eb98 -> 992ee1f65


Introduced a low-level scheduler API.

Review: https://reviews.apache.org/r/20309


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/992ee1f6
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/992ee1f6
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/992ee1f6

Branch: refs/heads/master
Commit: 992ee1f651195e591c38a34d0df0829f5e79497a
Parents: c7adb9d
Author: Benjamin Hindman <be...@gmail.com>
Authored: Mon Mar 17 22:42:05 2014 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Tue Jun 3 15:47:20 2014 -0700

----------------------------------------------------------------------
 include/mesos/mesos.proto               |   5 +-
 include/mesos/scheduler.hpp             |  83 ++-
 include/mesos/scheduler/scheduler.hpp   |  25 +
 include/mesos/scheduler/scheduler.proto | 190 +++++++
 src/Makefile.am                         |  48 +-
 src/sched/sched.cpp                     |   5 +-
 src/scheduler/scheduler.cpp             | 820 +++++++++++++++++++++++++++
 src/tests/master_tests.cpp              |  17 +-
 src/tests/scheduler_tests.cpp           | 156 ++++-
 9 files changed, 1324 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/992ee1f6/include/mesos/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index 82388e1..62f69d2 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -494,6 +494,8 @@ enum TaskState {
   TASK_FAILED = 3;   // TERMINAL.
   TASK_KILLED = 4;   // TERMINAL.
   TASK_LOST = 5;     // TERMINAL.
+
+  // TODO(benh): TASK_ERROR = 7; // TERMINAL.
 }
 
 
@@ -506,6 +508,7 @@ message TaskStatus {
   optional string message = 4; // Possible message explaining state.
   optional bytes data = 3;
   optional SlaveID slave_id = 5;
+  optional ExecutorID executor_id = 7; // TODO(benh): Use in master/slave.
   optional double timestamp = 6;
 }
 
@@ -629,7 +632,7 @@ message ACL {
 }
 
 
-/*
+/**
  * Collection of ACL.
  *
  * Each authorization request is evaluated against the ACLs in the order

http://git-wip-us.apache.org/repos/asf/mesos/blob/992ee1f6/include/mesos/scheduler.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/scheduler.hpp b/include/mesos/scheduler.hpp
index 85db111..a17db59 100644
--- a/include/mesos/scheduler.hpp
+++ b/include/mesos/scheduler.hpp
@@ -19,6 +19,14 @@
 #ifndef __MESOS_SCHEDULER_HPP__
 #define __MESOS_SCHEDULER_HPP__
 
+#if __cplusplus >= 201103L
+#include <functional>
+#else // __cplusplus >= 201103L
+#include <tr1/functional>
+#endif // __cplusplus >= 201103L
+
+#include <queue>
+
 #include <pthread.h>
 
 #include <string>
@@ -26,6 +34,8 @@
 
 #include <mesos/mesos.hpp>
 
+#include <mesos/scheduler/scheduler.hpp>
+
 
 /**
  * Mesos scheduler interface and scheduler driver. A scheduler is used
@@ -41,10 +51,14 @@ namespace mesos {
 // A few forward declarations.
 class SchedulerDriver;
 
+namespace scheduler {
+class MesosProcess;
+} // namespace scheduler {
+
 namespace internal {
 class MasterDetector;
 class SchedulerProcess;
-}
+} // namespace internal {
 
 
 /**
@@ -428,6 +442,73 @@ protected:
   internal::MasterDetector* detector;
 };
 
+
+namespace scheduler {
+
+/**
+ * Interface to Mesos for a scheduler. Abstracts master detection
+ * (connection and disconnection) and authentication if some
+ * credentials are provided.
+ *
+ * Expects three callbacks, 'connected', 'disconnected', and
+ * 'received' which will get invoked _serially_ when it's determined
+ * that we've connected, disconnected, or received events from the
+ * master. Note that we drop events while disconnected but it's
+ * possible to receive a batch of events across a
+ * disconnected/connected transition before getting the disconnected
+ * and then connected callback.
+ * TODO(benh): Don't include events in 'received' that occured after a
+ * disconnected/connected transition.
+ **/
+class Mesos
+{
+public:
+  Mesos(const std::string& master,
+#if __cplusplus >= 201103L
+        const std::function<void(void)>& connected,
+        const std::function<void(void)>& disconnected,
+        const std::function<void(const std::queue<Event>&)>& received);
+#else // __cplusplus >= 201103L
+        const std::tr1::function<void(void)>& connected,
+        const std::tr1::function<void(void)>& disconnected,
+        const std::tr1::function<void(const std::queue<Event>&)>& received);
+#endif // __cplusplus >= 201103L
+
+  /**
+   * Same as the above constructor but takes 'credential' as argument.
+   *
+   * The credential will be used for authenticating with the master.
+   *
+   **/
+  Mesos(const std::string& master,
+        const Credential& credential,
+#if __cplusplus >= 201103L
+        const std::function<void(void)>& connected,
+        const std::function<void(void)>& disconnected,
+        const std::function<void(const std::queue<Event>&)>& received);
+#else // __cplusplus >= 201103L
+        const std::tr1::function<void(void)>& connected,
+        const std::tr1::function<void(void)>& disconnected,
+        const std::tr1::function<void(const std::queue<Event>&)>& received);
+#endif // __cplusplus >= 201103L
+
+  virtual ~Mesos();
+
+  /**
+   * Attempts to send a call to the master.
+   *
+   * Some local validation of calls is performed which may generate
+   * events without ever being sent to the master. This includes when
+   * calls are sent but no master is currently detected (i.e., we're
+   * disconnected).
+   */
+  virtual void send(const Call& call);
+
+private:
+  MesosProcess* process;
+};
+
+} // namespace scheduler {
 } // namespace mesos {
 
 #endif // __MESOS_SCHEDULER_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/992ee1f6/include/mesos/scheduler/scheduler.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/scheduler/scheduler.hpp b/include/mesos/scheduler/scheduler.hpp
new file mode 100644
index 0000000..7aebebf
--- /dev/null
+++ b/include/mesos/scheduler/scheduler.hpp
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __SCHEDULER_PROTO_HPP__
+#define __SCHEDULER_PROTO_HPP__
+
+// ONLY USEFUL AFTER RUNNING PROTOC.
+#include <scheduler/scheduler.pb.h>
+
+#endif // __SCHEDULER_PROTO_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/992ee1f6/include/mesos/scheduler/scheduler.proto
----------------------------------------------------------------------
diff --git a/include/mesos/scheduler/scheduler.proto b/include/mesos/scheduler/scheduler.proto
new file mode 100644
index 0000000..4deda55
--- /dev/null
+++ b/include/mesos/scheduler/scheduler.proto
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import "mesos.proto";
+
+package mesos.scheduler;
+
+option java_package = "org.apache.mesos.scheduler";
+option java_outer_classname = "Protos";
+
+
+/**
+ * Low-level scheduler event API.
+ *
+ * An event is described using the standard protocol buffer "union"
+ * trick, see https://developers.google.com/protocol-buffers/docs/techniques#union.
+ */
+message Event {
+  // Possible event types, followed by message definitions if
+  // applicable.
+  enum Type {
+    REGISTERED = 1;
+    REREGISTERED = 2;
+    OFFERS = 3;
+    RESCIND = 4;
+    UPDATE = 5;
+    MESSAGE = 6;
+    FAILURE = 7;
+    ERROR = 8;
+  }
+
+  message Registered {
+    required FrameworkID framework_id = 1;
+    required MasterInfo master_info = 2;
+  }
+
+  message Reregistered {
+    required FrameworkID framework_id = 1;
+    required MasterInfo master_info = 2;
+  }
+
+  message Offers {
+    repeated Offer offers = 1;
+  }
+
+  message Rescind {
+    required OfferID offer_id = 1;
+  }
+
+  message Update {
+    required bytes uuid = 1; // TODO(benh): Replace with UpdateID.
+    required TaskStatus status = 2;
+  }
+
+  message Message {
+    required SlaveID slave_id = 1;
+    required ExecutorID executor_id = 2;
+    required bytes data = 3;
+  }
+
+  message Failure {
+    optional SlaveID slave_id = 1;
+
+    // If this was just a failure of an executor on a slave then
+    // 'executor_id' will be set and possibly 'status' (if we were
+    // able to determine the exit status).
+    optional ExecutorID executor_id = 2;
+    optional int32 status = 3;
+  }
+
+  message Error {
+    required string message = 1;
+  }
+
+  // TODO(benh): Add a 'from' or 'sender'.
+
+  // Type of the event, indicates which optional field below should be
+  // present if that type has a nested message definition.
+  required Type type = 1;
+
+  optional Registered registered = 2;
+  optional Reregistered reregistered = 3;
+  optional Offers offers = 4;
+  optional Rescind rescind = 5;
+  optional Update update = 6;
+  optional Message message = 7;
+  optional Failure failure = 8;
+  optional Error error = 9;
+}
+
+
+/**
+ * Low-level scheduler call API.
+ *
+ * Like Event, a Call is described using the standard protocol buffer
+ * "union" trick (see above).
+ */
+message Call {
+  // Possible call types, followed by message definitions if
+  // applicable.
+  enum Type {
+    REGISTER = 1;
+    REREGISTER = 2;
+    UNREGISTER = 3;
+    REQUEST = 4;
+    DECLINE = 5;
+    REVIVE = 6;
+    LAUNCH = 7;
+    KILL = 8;
+    ACKNOWLEDGE = 9;
+    RECONCILE = 10;
+    MESSAGE = 11;
+
+    // TODO(benh): Consider adding an 'ACTIVATE' and 'DEACTIVATE' for
+    // already registered frameworks as a way of stopping offers from
+    // being generated and other events from being sent by the master.
+    // Note that this functionality existed originally to support
+    // SchedulerDriver::abort which was only necessary to handle
+    // exceptions getting thrown from within Scheduler callbacks,
+    // something that is not an issue with the Event/Call API.
+  }
+
+  message Request {
+    repeated mesos.Request requests = 1;
+  }
+
+  message Decline {
+    repeated OfferID offer_ids = 1;
+    optional Filters filters = 2;
+  }
+
+  message Launch {
+    repeated TaskInfo task_infos = 1;
+    repeated OfferID offer_ids = 2;
+    optional Filters filters = 3;
+  }
+
+  message Kill {
+    required TaskID task_id = 1;
+  }
+
+  message Acknowledge {
+    required SlaveID slave_id = 1;
+    required TaskID task_id = 2;
+    required bytes uuid = 3;
+  }
+
+  message Reconcile {
+    // TODO(benh): Send TaskID's instead, see MESOS-1453.
+    repeated TaskStatus statuses = 1;
+  }
+
+  message Message {
+    required SlaveID slave_id = 1;
+    required ExecutorID executor_id = 2;
+    required bytes data = 3;
+  }
+
+  // Identifies who generated this call. Always necessary, but the
+  // only thing that needs to be set for certain calls, e.g.,
+  // REGISTER, REREGISTER, and UNREGISTER.
+  required FrameworkInfo framework_info = 1;
+
+  // Type of the call, indicates which optional field below should be
+  // present if that type has a nested message definition.
+  required Type type = 2;
+
+  optional Request request = 3;
+  optional Decline decline = 4;
+  optional Launch launch = 5;
+  optional Kill kill = 6;
+  optional Acknowledge acknowledge = 7;
+  optional Reconcile reconcile = 8;
+  optional Message message = 9;
+}

http://git-wip-us.apache.org/repos/asf/mesos/blob/992ee1f6/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index ffde59b..c3ecb94 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -110,19 +110,26 @@ endif
 MESOS_PROTO = $(top_srcdir)/include/mesos/mesos.proto
 
 CONTAINERIZER_PROTO =							\
-	$(top_srcdir)/include/mesos/containerizer/containerizer.proto
+  $(top_srcdir)/include/mesos/containerizer/containerizer.proto
+
+SCHEDULER_PROTO =							\
+  $(top_srcdir)/include/mesos/scheduler/scheduler.proto
 
 CXX_PROTOS =								\
-	mesos.pb.cc mesos.pb.h						\
-	containerizer/containerizer.pb.cc containerizer/containerizer.pb.h
+  mesos.pb.cc								\
+  mesos.pb.h								\
+  containerizer/containerizer.pb.cc					\
+  containerizer/containerizer.pb.h					\
+  scheduler/scheduler.pb.cc						\
+  scheduler/scheduler.pb.h
 
 JAVA_PROTOS =								\
-	java/generated/org/apache/mesos/Protos.java			\
-	java/generated/org/apache/mesos/containerizer/Protos.java
+  java/generated/org/apache/mesos/Protos.java				\
+  java/generated/org/apache/mesos/containerizer/Protos.java
 
 PYTHON_PROTOS =								\
-	python/src/mesos_pb2.py						\
-	python/src/containerizer_pb2.py
+  python/src/mesos_pb2.py						\
+  python/src/containerizer_pb2.py
 
 BUILT_SOURCES += $(CXX_PROTOS) $(JAVA_PROTOS) $(PYTHON_PROTOS)
 CLEANFILES += $(CXX_PROTOS) $(JAVA_PROTOS) $(PYTHON_PROTOS)
@@ -156,6 +163,10 @@ containerizer/%.pb.cc containerizer/%.pb.h: $(CONTAINERIZER_PROTO)
 	$(MKDIR_P) $(@D)
 	$(PROTOC) $(PROTOCFLAGS) --cpp_out=. $^
 
+scheduler/%.pb.cc scheduler/%.pb.h: $(SCHEDULER_PROTO)
+	$(MKDIR_P) $(@D)
+	$(PROTOC) $(PROTOCFLAGS) --cpp_out=. $^
+
 %.pb.cc %.pb.h: %.proto
 	$(MKDIR_P) $(@D)
 	$(PROTOC) $(PROTOCFLAGS) --cpp_out=. $^
@@ -169,6 +180,10 @@ java/generated/org/apache/mesos/containerizer/Protos.java:		\
 	$(MKDIR_P)  $(@D)
 	$(PROTOC) $(PROTOCFLAGS) --java_out=java/generated $^
 
+java/generated/org/apache/mesos/scheduler/Protos.java: $(SCHEDULER_PROTO)
+	$(MKDIR_P)  $(@D)
+	$(PROTOC) $(PROTOCFLAGS) --java_out=java/generated $^
+
 python/src/mesos_pb2.py: $(MESOS_PROTO)
 	$(MKDIR_P) $(@D)
 	$(PROTOC) $(PROTOCFLAGS) --python_out=python/src $^
@@ -178,6 +193,11 @@ python/src/containerizer_pb2.py: $(CONTAINERIZER_PROTO)
 	$(PROTOC) -I$(top_srcdir)/include/mesos/containerizer		\
 		$(PROTOCFLAGS) --python_out=python/src $^
 
+python/src/scheduler_pb2.py: $(SCHEDULER_PROTO)
+	$(MKDIR_P) $(@D)
+	$(PROTOC) -I$(top_srcdir)/include/mesos/scheduler		\
+		$(PROTOCFLAGS) --python_out=python/src $^
+
 # We even use a convenience library for most of Mesos so that we can
 # exclude third party libraries so setuptools/distribute can build a
 # self-contained Python library and statically link in the third party
@@ -196,6 +216,7 @@ libmesos_no_3rdparty_la_SOURCES =					\
 	sasl/auxprop.hpp						\
 	sasl/auxprop.cpp						\
 	sched/sched.cpp							\
+	scheduler/scheduler.cpp						\
 	local/local.cpp							\
 	master/contender.cpp						\
 	master/constants.cpp						\
@@ -253,8 +274,15 @@ containerizer_HEADERS =							\
   $(top_srcdir)/include/mesos/containerizer/containerizer.hpp		\
   $(top_srcdir)/include/mesos/containerizer/containerizer.proto
 
-nodist_containerizer_HEADERS =						\
-  containerizer/containerizer.pb.h
+nodist_containerizer_HEADERS = containerizer/containerizer.pb.h
+
+schedulerdir = $(pkgincludedir)/scheduler
+
+scheduler_HEADERS =							\
+  $(top_srcdir)/include/mesos/scheduler/scheduler.hpp			\
+  $(top_srcdir)/include/mesos/scheduler/scheduler.proto
+
+nodist_scheduler_HEADERS = scheduler/scheduler.pb.h
 
 if OS_LINUX
   libmesos_no_3rdparty_la_SOURCES += linux/cgroups.cpp
@@ -453,7 +481,7 @@ libmesos_no_3rdparty_la_LIBADD += libstate.la
 lib_LTLIBRARIES += libmesos.la
 
 # Include as part of the distribution.
-libmesos_la_SOURCES = $(MESOS_PROTO) $(CONTAINERIZER_PROTO)
+libmesos_la_SOURCES = $(MESOS_PROTO) $(CONTAINERIZER_PROTO) $(SCHEDULER_PROTO)
 
 libmesos_la_LDFLAGS = -release $(PACKAGE_VERSION) -shared
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/992ee1f6/src/sched/sched.cpp
----------------------------------------------------------------------
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index be23e01..b27222e 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -56,6 +56,7 @@
 #include <stout/option.hpp>
 #include <stout/os.hpp>
 #include <stout/stopwatch.hpp>
+#include <stout/utils.hpp>
 #include <stout/uuid.hpp>
 
 #include "sasl/authenticatee.hpp"
@@ -86,6 +87,7 @@ using std::vector;
 
 using process::wait; // Necessary on some OS's to disambiguate.
 
+using utils::copy;
 
 namespace mesos {
 namespace internal {
@@ -264,8 +266,7 @@ protected:
       // are here, making the 'discard' here a no-op. This is ok
       // because we set 'reauthenticate' here which enforces a retry
       // in '_authenticate'.
-      Future<bool> authenticating_ = authenticating.get();
-      authenticating_.discard();
+      copy(authenticating.get()).discard();
       reauthenticate = true;
       return;
     }

http://git-wip-us.apache.org/repos/asf/mesos/blob/992ee1f6/src/scheduler/scheduler.cpp
----------------------------------------------------------------------
diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp
new file mode 100644
index 0000000..9e8975e
--- /dev/null
+++ b/src/scheduler/scheduler.cpp
@@ -0,0 +1,820 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <dlfcn.h>
+#include <errno.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+
+#include <arpa/inet.h>
+
+#include <iostream>
+#include <string>
+#include <sstream>
+
+#include <mesos/mesos.hpp>
+#include <mesos/scheduler.hpp>
+
+#include <process/async.hpp>
+#include <process/defer.hpp>
+#include <process/delay.hpp>
+#include <process/dispatch.hpp>
+#include <process/future.hpp>
+#include <process/id.hpp>
+#include <process/mutex.hpp>
+#include <process/process.hpp>
+#include <process/protobuf.hpp>
+
+#include <stout/check.hpp>
+#include <stout/duration.hpp>
+#include <stout/error.hpp>
+#include <stout/flags.hpp>
+#include <stout/lambda.hpp>
+#include <stout/nothing.hpp>
+#include <stout/option.hpp>
+#include <stout/os.hpp>
+#include <stout/uuid.hpp>
+
+#include "sasl/authenticatee.hpp"
+
+#include "common/type_utils.hpp"
+
+#include "master/detector.hpp"
+
+#include "local/local.hpp"
+
+#include "master/detector.hpp"
+
+#include "logging/flags.hpp"
+#include "logging/logging.hpp"
+
+#include "messages/messages.hpp"
+
+using namespace mesos;
+using namespace mesos::internal;
+using namespace mesos::internal::master;
+
+using namespace process;
+
+using std::queue;
+using std::string;
+using std::vector;
+
+using process::wait; // Necessary on some OS's to disambiguate.
+
+namespace mesos {
+namespace scheduler {
+
+// The process (below) is responsible for receiving messages
+// (eventually events) from the master and sending messages (via
+// calls) to the master.
+class MesosProcess : public ProtobufProcess<MesosProcess>
+{
+public:
+  MesosProcess(
+      const string& master,
+      const Option<Credential>& _credential,
+      const lambda::function<void(void)>& _connected,
+      const lambda::function<void(void)>& _disconnected,
+      lambda::function<void(const queue<Event>&)> _received)
+    : ProcessBase(ID::generate("scheduler")),
+      credential(_credential),
+      connected(_connected),
+      disconnected(_disconnected),
+      received(_received),
+      local(false),
+      failover(true),
+      detector(NULL),
+      authenticatee(NULL),
+      authenticating(None()),
+      authenticated(false),
+      reauthenticate(false)
+  {
+    GOOGLE_PROTOBUF_VERIFY_VERSION;
+
+    // Load any flags from the environment (we use local::Flags in the
+    // event we run in 'local' mode, since it inherits
+    // logging::Flags). In the future, just as the TODO in
+    // local/main.cpp discusses, we'll probably want a way to load
+    // master::Flags and slave::Flags as well.
+    local::Flags flags;
+
+    Try<Nothing> load = flags.load("MESOS_");
+
+    if (load.isError()) {
+      error("Failed to load flags: " + load.error());
+      return;
+    }
+
+    // Initialize libprocess (done here since at some point we might
+    // want to use flags to initialize libprocess).
+    process::initialize();
+
+    logging::initialize("mesos", flags);
+
+    LOG(INFO) << "Version: " << MESOS_VERSION;
+
+    // Launch a local cluster if necessary.
+    Option<UPID> pid = None();
+    if (master == "local") {
+      pid = local::launch(flags);
+      local = true;
+    }
+
+    Try<MasterDetector*> detector_ =
+      MasterDetector::create(pid.isSome() ? string(pid.get()) : master);
+
+    if (detector_.isError()) {
+      error("Failed to create a master detector:" + detector_.error());
+      return;
+    }
+
+    // Save the detector so we can delete it later.
+    detector = detector_.get();
+  }
+
+  virtual ~MesosProcess()
+  {
+    delete authenticatee;
+
+    // Check and see if we need to shutdown a local cluster.
+    if (local) {
+      local::shutdown();
+    }
+
+    // Wait for any callbacks to finish.
+    mutex.lock().await();
+  }
+
+  // TODO(benh): Move this to 'protected'.
+  using ProtobufProcess<MesosProcess>::send;
+
+  void send(Call call)
+  {
+    if (master.isNone()) {
+      drop(call, "Disconnected");
+      return;
+    }
+
+    // If no user was specified in FrameworkInfo, use the current user.
+    // TODO(benh): Make FrameworkInfo.user be optional and add a
+    // 'user' to either TaskInfo or CommandInfo.
+    if (call.framework_info().user() == "") {
+      call.mutable_framework_info()->set_user(os::user());
+    }
+
+    // Only a REGISTER should not have set the framework ID.
+    if (call.type() != Call::REGISTER && !call.framework_info().has_id()) {
+      drop(call, "Call is mising FrameworkInfo.id");
+      return;
+    }
+
+    if (!call.IsInitialized()) {
+      drop(call, "Call is not properly initialized: " +
+           call.InitializationErrorString());
+      return;
+    }
+
+    switch (call.type()) {
+      case Call::REGISTER: {
+        RegisterFrameworkMessage message;
+        message.mutable_framework()->CopyFrom(call.framework_info());
+        send(master.get(), message);
+        break;
+      }
+
+      case Call::REREGISTER: {
+        ReregisterFrameworkMessage message;
+        message.mutable_framework()->CopyFrom(call.framework_info());
+        message.set_failover(failover);
+        send(master.get(), message);
+        break;
+      }
+
+      case Call::UNREGISTER: {
+        UnregisterFrameworkMessage message;
+        message.mutable_framework_id()->CopyFrom(call.framework_info().id());
+        send(master.get(), message);
+        break;
+      }
+
+      case Call::REQUEST: {
+        ResourceRequestMessage message;
+        message.mutable_framework_id()->CopyFrom(call.framework_info().id());
+        message.mutable_requests()->CopyFrom(call.request().requests());
+        send(master.get(), message);
+        break;
+      }
+
+      case Call::DECLINE: {
+        LaunchTasksMessage message;
+        message.mutable_framework_id()->CopyFrom(call.framework_info().id());
+        message.mutable_filters()->CopyFrom(call.decline().filters());
+        message.mutable_offer_ids()->CopyFrom(call.decline().offer_ids());
+        send(master.get(), message);
+        break;
+      }
+
+      case Call::REVIVE: {
+        ReviveOffersMessage message;
+        message.mutable_framework_id()->CopyFrom(call.framework_info().id());
+        send(master.get(), message);
+        break;
+      }
+
+      case Call::LAUNCH: {
+        // We do some local validation here, but really this should
+        // all happen in the master so it's only implemented once.
+        vector<TaskInfo> tasks;
+
+        foreach (const TaskInfo& task, call.launch().task_infos()) {
+          // Check that each TaskInfo has either an ExecutorInfo or a
+          // CommandInfo but not both.
+          if (task.has_executor() == task.has_command()) {
+            drop(task,
+                 "TaskInfo must have either an 'executor' or a 'command'");
+            continue;
+          }
+
+          // Ensure ExecutorInfo::framework_id is valid, if present.
+          if (task.has_executor() &&
+              task.executor().has_framework_id() &&
+              !(task.executor().framework_id() == call.framework_info().id())) {
+            drop(task,
+                 "ExecutorInfo has an invalid FrameworkID (Actual: " +
+                 stringify(task.executor().framework_id()) + " vs Expected: " +
+                 stringify(call.framework_info().id()) + ")");
+            continue;
+          }
+
+          tasks.push_back(task);
+
+          // Set ExecutorInfo::framework_id if missing since this
+          // field was added to the API later and thus was made
+          // optional.
+          if (task.has_executor() && !task.executor().has_framework_id()) {
+            tasks.back().mutable_executor()->mutable_framework_id()->CopyFrom(
+                call.framework_info().id());
+          }
+        }
+
+        LaunchTasksMessage message;
+        message.mutable_framework_id()->CopyFrom(call.framework_info().id());
+        message.mutable_filters()->CopyFrom(call.launch().filters());
+        message.mutable_offer_ids()->CopyFrom(call.launch().offer_ids());
+
+        foreach (const TaskInfo& task, tasks) {
+          message.add_tasks()->CopyFrom(task);
+        }
+
+        send(master.get(), message);
+        break;
+      }
+
+      case Call::KILL: {
+        KillTaskMessage message;
+        message.mutable_framework_id()->CopyFrom(call.framework_info().id());
+        message.mutable_task_id()->CopyFrom(call.kill().task_id());
+        send(master.get(), message);
+        break;
+      }
+
+      case Call::ACKNOWLEDGE: {
+        StatusUpdateAcknowledgementMessage message;
+        message.mutable_framework_id()->CopyFrom(call.framework_info().id());
+        message.mutable_slave_id()->CopyFrom(call.acknowledge().slave_id());
+        message.mutable_task_id()->CopyFrom(call.acknowledge().task_id());
+        message.set_uuid(call.acknowledge().uuid());
+        send(master.get(), message);
+        break;
+      }
+
+      case Call::RECONCILE: {
+        ReconcileTasksMessage message;
+        message.mutable_framework_id()->CopyFrom(call.framework_info().id());
+        message.mutable_statuses()->CopyFrom(call.reconcile().statuses());
+        send(master.get(), message);
+        break;
+      }
+
+      case Call::MESSAGE: {
+        FrameworkToExecutorMessage message;
+        message.mutable_slave_id()->CopyFrom(call.message().slave_id());
+        message.mutable_framework_id()->CopyFrom(call.framework_info().id());
+        message.mutable_executor_id()->CopyFrom(call.message().executor_id());
+        message.set_data(call.message().data());
+        send(master.get(), message);
+        break;
+      }
+
+      default:
+        VLOG(1) << "Unexpected call " << stringify(call.type());
+        break;
+    }
+  }
+
+protected:
+  virtual void initialize()
+  {
+    install<FrameworkRegisteredMessage>(&MesosProcess::receive);
+    install<FrameworkReregisteredMessage>(&MesosProcess::receive);
+    install<ResourceOffersMessage>(&MesosProcess::receive);
+    install<RescindResourceOfferMessage>(&MesosProcess::receive);
+    install<StatusUpdateMessage>(&MesosProcess::receive);
+    install<LostSlaveMessage>(&MesosProcess::receive);
+    install<ExecutorToFrameworkMessage>(&MesosProcess::receive);
+    install<FrameworkErrorMessage>(&MesosProcess::receive);
+
+    // Start detecting masters.
+    detector->detect()
+      .onAny(defer(self(), &MesosProcess::detected, lambda::_1));
+  }
+
+  void detected(const Future<Option<MasterInfo> >& future)
+  {
+    CHECK(!future.isDiscarded());
+
+    if (future.isFailed()) {
+      error("Failed to detect a master: " + future.failure());
+      return;
+    }
+
+    if (future.get().isNone()) {
+      master = None();
+
+      VLOG(1) << "No master detected";
+
+      mutex.lock()
+        .then(defer(self(), &Self::_detected))
+        .onAny(lambda::bind(&Mutex::unlock, mutex));
+    } else {
+      master = UPID(future.get().get().pid());
+
+      VLOG(1) << "New master detected at " << master.get();
+
+      if (credential.isSome()) {
+        // Authenticate with the master.
+        authenticate();
+      } else {
+        mutex.lock()
+          .then(defer(self(), &Self::__detected))
+          .onAny(lambda::bind(&Mutex::unlock, mutex));
+      }
+    }
+
+    // Keep detecting masters.
+    detector->detect(future.get())
+      .onAny(defer(self(), &MesosProcess::detected, lambda::_1));
+  }
+
+  Future<Nothing> _detected()
+  {
+    return async(disconnected);
+  }
+
+  Future<Nothing> __detected()
+  {
+    return async(connected);
+  }
+
+  void authenticate()
+  {
+    authenticated = false;
+
+    // We retry to authenticate and it's possible that we'll get
+    // disconnected while that is happening.
+    if (master.isNone()) {
+      return;
+    }
+
+    if (authenticating.isSome()) {
+      // Authentication is in progress. Try to cancel it.
+      // Note that it is possible that 'authenticating' is ready
+      // and the dispatch to '_authenticate' is enqueued when we
+      // are here, making the 'discard' here a no-op. This is ok
+      // because we set 'reauthenticate' here which enforces a retry
+      // in '_authenticate'.
+      Future<bool>(authenticating.get()).discard();
+      reauthenticate = true;
+      return;
+    }
+
+    VLOG(1) << "Authenticating with master " << master.get();
+
+    CHECK_SOME(credential);
+
+    CHECK(authenticatee == NULL);
+    authenticatee = new sasl::Authenticatee(credential.get(), self());
+
+    // NOTE: We do not pass 'Owned<Authenticatee>' here because doing
+    // so could make 'AuthenticateeProcess' responsible for deleting
+    // 'Authenticatee' causing a deadlock because the destructor of
+    // 'Authenticatee' waits on 'AuthenticateeProcess'.
+    // This will happen in the following scenario:
+    // --> 'AuthenticateeProcess' does a 'Future.set()'.
+    // --> '_authenticate()' is dispatched to this process.
+    // --> This process executes '_authenticatee()'.
+    // --> 'AuthenticateeProcess' removes the onAny callback
+    //     from its queue which holds the last reference to
+    //     'Authenticatee'.
+    // --> '~Authenticatee()' is invoked by 'AuthenticateeProcess'.
+    // TODO(vinod): Consider using 'Shared' to 'Owned' upgrade.
+    authenticating = authenticatee->authenticate(master.get())
+      .onAny(defer(self(), &Self::_authenticate));
+
+    delay(Seconds(5),
+          self(),
+          &Self::authenticationTimeout,
+          authenticating.get());
+  }
+
+  void _authenticate()
+  {
+    delete CHECK_NOTNULL(authenticatee);
+    authenticatee = NULL;
+
+    CHECK_SOME(authenticating);
+    const Future<bool>& future = authenticating.get();
+
+    if (master.isNone()) {
+      VLOG(1) << "Ignoring authentication because no master is detected";
+      authenticating = None();
+
+      // Set it to false because we do not want further retries until
+      // a new master is detected.
+      // We obviously do not need to reauthenticate either even if
+      // 'reauthenticate' is currently true because the master is
+      // lost.
+      reauthenticate = false;
+      return;
+    }
+
+    if (reauthenticate || !future.isReady()) {
+      VLOG(1)
+        << "Failed to authenticate with master " << master.get() << ": "
+        << (reauthenticate ? "master changed" :
+           (future.isFailed() ? future.failure() : "future discarded"));
+
+      authenticating = None();
+      reauthenticate = false;
+
+      // TODO(vinod): Add a limit on number of retries.
+      dispatch(self(), &Self::authenticate); // Retry.
+      return;
+    }
+
+    if (!future.get()) {
+      VLOG(1) << "Master " << master.get() << " refused authentication";
+      error("Authentication refused");
+      return;
+    }
+
+    VLOG(1) << "Successfully authenticated with master " << master.get();
+
+    authenticated = true;
+    authenticating = None();
+
+    mutex.lock()
+      .then(defer(self(), &Self::__authenticate))
+      .onAny(lambda::bind(&Mutex::unlock, mutex));
+  }
+
+  Future<Nothing> __authenticate()
+  {
+    return async(connected);
+  }
+
+  void authenticationTimeout(Future<bool> future)
+  {
+    // NOTE: Discarded future results in a retry in '_authenticate()'.
+    // Also note that a 'discard' here is safe even if another
+    // authenticator is in progress because this copy of the future
+    // corresponds to the original authenticator that started the timer.
+    if (future.discard()) { // This is a no-op if the future is already ready.
+      LOG(WARNING) << "Authentication timed out";
+    }
+  }
+
+  // NOTE: A None 'from' is possible when an event is injected locally.
+  void receive(const Option<UPID>& from, const Event& event)
+  {
+    // Check if we're disconnected but received an event.
+    if (from.isSome() && master.isNone()) {
+      // Ignore the event unless it's a registered or reregistered.
+      if (event.type() != Event::REGISTERED &&
+          event.type() != Event::REREGISTERED) {
+        VLOG(1) << "Ignoring " << stringify(event.type())
+                << " event because we're disconnected";
+        return;
+      }
+    } else if (from.isSome() && master != from) {
+      VLOG(1)
+        << "Ignoring " << stringify(event.type())
+        << " event because it was sent from '" << from.get()
+        << "' instead of the leading master '" << master.get() << "'";
+      return;
+    }
+
+    // Note that if 'from' is None we're locally injecting this event
+    // so we always want to enqueue it even if we're not connected!
+
+    VLOG(1) << "Enqueuing event " << stringify(event.type()) << " from "
+            << (from.isNone() ? "(locally injected)" : from.get());
+
+    // Queue up the event and invoke the 'received' callback if this
+    // is the first event (between now and when the 'received'
+    // callback actually gets invoked more events might get queued).
+    events.push(event);
+
+    if (events.size() == 1) {
+      mutex.lock()
+        .then(defer(self(), &Self::_receive))
+        .onAny(lambda::bind(&Mutex::unlock, mutex));
+    }
+  }
+
+  Future<Nothing> _receive()
+  {
+    Future<Nothing> future = async(received, events);
+    events = queue<Event>();
+    return future;
+  }
+
+  void receive(const UPID& from, const FrameworkRegisteredMessage& message)
+  {
+    // We've now registered at least once with the master so we're no
+    // longer failing over. See the comment where 'failover' is
+    // declared for further details.
+    failover = false;
+
+    Event event;
+    event.set_type(Event::REGISTERED);
+
+    Event::Registered* registered = event.mutable_registered();
+
+    registered->mutable_framework_id()->CopyFrom(message.framework_id());
+    registered->mutable_master_info()->CopyFrom(message.master_info());
+
+    receive(from, event);
+  }
+
+  void receive(const UPID& from, const FrameworkReregisteredMessage& message)
+  {
+    // We've now registered at least once with the master so we're no
+    // longer failing over. See the comment where 'failover' is
+    // declared for further details.
+    failover = false;
+
+    Event event;
+    event.set_type(Event::REREGISTERED);
+
+    Event::Reregistered* reregistered = event.mutable_reregistered();
+
+    reregistered->mutable_framework_id()->CopyFrom(message.framework_id());
+    reregistered->mutable_master_info()->CopyFrom(message.master_info());
+
+    receive(from, event);
+  }
+
+  void receive(const UPID& from, const ResourceOffersMessage& message)
+  {
+    Event event;
+    event.set_type(Event::OFFERS);
+
+    Event::Offers* offers = event.mutable_offers();
+
+    offers->mutable_offers()->CopyFrom(message.offers());
+
+    receive(from, event);
+  }
+
+  void receive(const UPID& from, const RescindResourceOfferMessage& message)
+  {
+    Event event;
+    event.set_type(Event::RESCIND);
+
+    Event::Rescind* rescind = event.mutable_rescind();
+
+    rescind->mutable_offer_id()->CopyFrom(message.offer_id());
+
+    receive(from, event);
+  }
+
+  void receive(const UPID& from, const StatusUpdateMessage& message)
+  {
+    Event event;
+    event.set_type(Event::UPDATE);
+
+    Event::Update* update = event.mutable_update();
+
+    update->mutable_status()->CopyFrom(message.update().status());
+
+    if (message.update().has_slave_id()) {
+      update->mutable_status()->mutable_slave_id()->CopyFrom(
+          message.update().slave_id());
+    }
+
+    if (message.update().has_executor_id()) {
+      update->mutable_status()->mutable_executor_id()->CopyFrom(
+          message.update().executor_id());
+    }
+
+    update->mutable_status()->set_timestamp(message.update().timestamp());
+
+    update->set_uuid(message.update().uuid());
+
+    receive(from, event);
+  }
+
+  void receive(const UPID& from, const LostSlaveMessage& message)
+  {
+    Event event;
+    event.set_type(Event::FAILURE);
+
+    Event::Failure* failure = event.mutable_failure();
+
+    failure->mutable_slave_id()->CopyFrom(message.slave_id());
+
+    receive(from, event);
+  }
+
+  void receive(const UPID& from, const ExecutorToFrameworkMessage& _message)
+  {
+    Event event;
+    event.set_type(Event::MESSAGE);
+
+    Event::Message* message = event.mutable_message();
+
+    message->mutable_slave_id()->CopyFrom(_message.slave_id());
+    message->mutable_executor_id()->CopyFrom(_message.executor_id());
+    message->set_data(_message.data());
+
+    receive(from, event);
+  }
+
+  void receive(const UPID& from, const FrameworkErrorMessage& message)
+  {
+    Event event;
+    event.set_type(Event::ERROR);
+
+    Event::Error* error = event.mutable_error();
+
+    error->set_message(message.message());
+
+    receive(from, event);
+  }
+
+  // Helper for injecting an ERROR event.
+  void error(const string& message)
+  {
+    Event event;
+
+    event.set_type(Event::ERROR);
+
+    Event::Error* error = event.mutable_error();
+
+    error->set_message(message);
+
+    receive(None(), event);
+  }
+
+  // Helper for "dropping" a task that was launched.
+  void drop(const TaskInfo& task, const string& message)
+  {
+    Event event;
+
+    event.set_type(Event::UPDATE);
+
+    Event::Update* update = event.mutable_update();
+
+    TaskStatus* status = update->mutable_status();
+    status->mutable_task_id()->CopyFrom(task.task_id());
+    status->set_state(TASK_LOST);
+    status->set_message(message);
+    status->set_timestamp(Clock::now().secs());
+
+    update->set_uuid(UUID::random().toBytes());
+
+    receive(None(), event);
+  }
+
+  void drop(const Call& call, const string& message)
+  {
+    VLOG(1) << "Dropping " << stringify(call.type()) << ": " << message;
+
+    switch (call.type()) {
+      case Call::LAUNCH: {
+        // We drop the tasks preemptively (enqueing update events that
+        // put the task in TASK_LOST). This is a hack for now, to keep
+        // the tasks from being forever in PENDING state, when
+        // actually the master never received the launch.
+        // Unfortuantely this is insufficient since it doesn't capture
+        // the case when the scheduler process sends it but the master
+        // never receives it (i.e., during a master failover). In the
+        // future, this should be solved by higher-level abstractions
+        // and this hack should be considered for removal.
+        foreach (const TaskInfo& task, call.launch().task_infos()) {
+          drop(task, message);
+        }
+        break;
+      }
+
+      default:
+        break;
+    }
+  }
+
+private:
+  const Option<Credential> credential;
+
+  Mutex mutex; // Used to serialize the callback invocations.
+
+  lambda::function<void(void)> connected;
+  lambda::function<void(void)> disconnected;
+  lambda::function<void(const queue<Event>&)> received;
+
+  bool local; // Whether or not we launched a local cluster.
+
+  // Whether or not this is the first time we've sent a
+  // REREGISTER. This is to maintain compatibility with what the
+  // master expects from SchedulerProcess. After the first REGISTER or
+  // REREGISTER event we force this to be false.
+  bool failover;
+
+  MasterDetector* detector;
+
+  queue<Event> events;
+
+  Option<UPID> master;
+
+  sasl::Authenticatee* authenticatee;
+
+  // Indicates if an authentication attempt is in progress.
+  Option<Future<bool> > authenticating;
+
+  // Indicates if the authentication is successful.
+  bool authenticated;
+
+  // Indicates if a new authentication attempt should be enforced.
+  bool reauthenticate;
+};
+
+
+Mesos::Mesos(
+    const string& master,
+    const lambda::function<void(void)>& connected,
+    const lambda::function<void(void)>& disconnected,
+    const lambda::function<void(const queue<Event>&)>& received)
+{
+  process =
+    new MesosProcess(master, None(), connected, disconnected, received);
+  spawn(process);
+}
+
+
+Mesos::Mesos(
+    const string& master,
+    const Credential& credential,
+    const lambda::function<void(void)>& connected,
+    const lambda::function<void(void)>& disconnected,
+    const lambda::function<void(const queue<Event>&)>& received)
+{
+  process =
+    new MesosProcess(master, credential, connected, disconnected, received);
+  spawn(process);
+}
+
+
+Mesos::~Mesos()
+{
+  terminate(process);
+  wait(process);
+  delete process;
+}
+
+
+void Mesos::send(const Call& call)
+{
+  dispatch(process, &MesosProcess::send, call);
+}
+
+
+} // namespace scheduler {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/992ee1f6/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index 7183cb7..34df121 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -46,11 +46,12 @@
 #include "master/master.hpp"
 
 #include "slave/constants.hpp"
-#include "slave/containerizer/mesos_containerizer.hpp"
 #include "slave/gc.hpp"
 #include "slave/flags.hpp"
 #include "slave/slave.hpp"
 
+#include "slave/containerizer/mesos_containerizer.hpp"
+
 #include "tests/containerizer.hpp"
 #include "tests/mesos.hpp"
 
@@ -134,12 +135,11 @@ TEST_F(MasterTest, TaskRunning)
   EXPECT_CALL(exec, launchTask(_, _))
     .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
 
-  Future<Nothing> resourcesUpdated;
   Future<Nothing> update;
   EXPECT_CALL(containerizer,
               update(_, Resources(offers.get()[0].resources())))
-    .WillOnce(DoAll(FutureSatisfy(&resourcesUpdated),
-                    Return(update)));
+    .WillOnce(DoAll(FutureSatisfy(&update),
+                    Return(Future<Nothing>())));
 
   Future<TaskStatus> status;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
@@ -150,7 +150,7 @@ TEST_F(MasterTest, TaskRunning)
   AWAIT_READY(status);
   EXPECT_EQ(TASK_RUNNING, status.get().state());
 
-  AWAIT_READY(resourcesUpdated);
+  AWAIT_READY(update);
 
   EXPECT_CALL(exec, shutdown(_))
     .Times(AtMost(1));
@@ -210,12 +210,11 @@ TEST_F(MasterTest, ShutdownFrameworkWhileTaskRunning)
   EXPECT_CALL(exec, launchTask(_, _))
     .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
 
-  Future<Nothing> resourcesUpdated;
   Future<Nothing> update;
   EXPECT_CALL(containerizer,
               update(_, Resources(offers.get()[0].resources())))
-    .WillOnce(DoAll(FutureSatisfy(&resourcesUpdated),
-                    Return(update)));
+    .WillOnce(DoAll(FutureSatisfy(&update),
+                    Return(Future<Nothing>())));
 
   Future<TaskStatus> status;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
@@ -226,7 +225,7 @@ TEST_F(MasterTest, ShutdownFrameworkWhileTaskRunning)
   AWAIT_READY(status);
   EXPECT_EQ(TASK_RUNNING, status.get().state());
 
-  AWAIT_READY(resourcesUpdated);
+  AWAIT_READY(update);
 
   EXPECT_CALL(exec, shutdown(_))
     .Times(AtMost(1));

http://git-wip-us.apache.org/repos/asf/mesos/blob/992ee1f6/src/tests/scheduler_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/scheduler_tests.cpp b/src/tests/scheduler_tests.cpp
index bd3c22d..d87bf78 100644
--- a/src/tests/scheduler_tests.cpp
+++ b/src/tests/scheduler_tests.cpp
@@ -18,22 +18,30 @@
 
 #include <gmock/gmock.h>
 
+#include <queue>
 #include <vector>
 
+#include <mesos/executor.hpp>
 #include <mesos/scheduler.hpp>
 
 #include <process/future.hpp>
+#include <process/gmock.hpp>
 #include <process/gtest.hpp>
 #include <process/http.hpp>
+#include <process/owned.hpp>
 #include <process/pid.hpp>
+#include <process/queue.hpp>
 
 #include <process/metrics/metrics.hpp>
 
 #include <stout/json.hpp>
+#include <stout/lambda.hpp>
+#include <stout/memory.hpp>
 #include <stout/try.hpp>
 
 #include "master/master.hpp"
 
+#include "tests/containerizer.hpp"
 #include "tests/mesos.hpp"
 
 using namespace mesos;
@@ -42,23 +50,167 @@ using namespace mesos::internal::tests;
 
 using mesos::internal::master::Master;
 
+using mesos::internal::slave::Containerizer;
+using mesos::internal::slave::Slave;
+
+using mesos::scheduler::Call;
+using mesos::scheduler::Event;
+
 using process::Future;
+using process::Owned;
 using process::PID;
+using process::Promise;
+using process::Queue;
 
 using process::http::OK;
 
 using process::metrics::internal::MetricsProcess;
 
+using std::queue;
 using std::vector;
 
 using testing::_;
+using testing::AtMost;
+using testing::DoAll;
 using testing::Return;
 
 
-class SchedulerTest : public MesosTest {};
+class SchedulerTest : public MesosTest
+{
+protected:
+  // Helper class for using EXPECT_CALL since the Mesos scheduler API
+  // is callback based.
+  class Callbacks
+  {
+  public:
+    MOCK_METHOD0(connected, void(void));
+    MOCK_METHOD0(disconnected, void(void));
+    MOCK_METHOD1(received, void(const std::queue<Event>&));
+  };
+
+};
+
+
+// Enqueues all received events into a libprocess queue.
+ACTION_P(Enqueue, queue)
+{
+  std::queue<Event> events = arg0;
+  while (!events.empty()) {
+    queue->put(events.front());
+    events.pop();
+  }
+}
+
+
+TEST_F(SchedulerTest, TaskRunning)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+  TestContainerizer containerizer(&exec);
+
+  Try<PID<Slave> > slave = StartSlave(&containerizer);
+  ASSERT_SOME(slave);
+
+  Callbacks callbacks;
+
+  Future<Nothing> connected;
+  EXPECT_CALL(callbacks, connected())
+    .WillOnce(FutureSatisfy(&connected));
+
+  scheduler::Mesos mesos(
+      master.get(),
+      DEFAULT_CREDENTIAL,
+      lambda::bind(&Callbacks::connected, lambda::ref(callbacks)),
+      lambda::bind(&Callbacks::disconnected, lambda::ref(callbacks)),
+      lambda::bind(&Callbacks::received, lambda::ref(callbacks), lambda::_1));
+
+  AWAIT_READY(connected);
+
+  Queue<Event> events;
+
+  EXPECT_CALL(callbacks, received(_))
+    .WillRepeatedly(Enqueue(&events));
+
+  {
+    Call call;
+    call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
+    call.set_type(Call::REGISTER);
+
+    mesos.send(call);
+  }
+
+  Future<Event> event = events.get();
+  AWAIT_READY(event);
+  EXPECT_EQ(Event::REGISTERED, event.get().type());
+
+  FrameworkID id(event.get().registered().framework_id());
+
+  event = events.get();
+  AWAIT_READY(event);
+  EXPECT_EQ(Event::OFFERS, event.get().type());
+  EXPECT_NE(0u, event.get().offers().offers().size());
+
+  EXPECT_CALL(exec, registered(_, _, _, _))
+    .Times(1);
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  Future<Nothing> update;
+  EXPECT_CALL(containerizer, update(_, _))
+    .WillOnce(DoAll(FutureSatisfy(&update),
+                    Return(Future<Nothing>())))
+    .WillRepeatedly(Return(Future<Nothing>())); // Ignore subsequent calls.
+
+  TaskInfo taskInfo;
+  taskInfo.set_name("");
+  taskInfo.mutable_task_id()->set_value("1");
+  taskInfo.mutable_slave_id()->CopyFrom(
+      event.get().offers().offers(0).slave_id());
+  taskInfo.mutable_resources()->CopyFrom(
+      event.get().offers().offers(0).resources());
+  taskInfo.mutable_executor()->CopyFrom(DEFAULT_EXECUTOR_INFO);
+
+  // TODO(benh): Enable just running a task with a command in the tests:
+  //   taskInfo.mutable_command()->set_value("sleep 10");
+
+  {
+    Call call;
+    call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
+    call.mutable_framework_info()->mutable_id()->CopyFrom(id);
+    call.set_type(Call::LAUNCH);
+    call.mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
+    call.mutable_launch()->add_offer_ids()->CopyFrom(
+        event.get().offers().offers(0).id());
+
+    mesos.send(call);
+  }
+
+  event = events.get();
+  AWAIT_READY(event);
+  EXPECT_EQ(Event::UPDATE, event.get().type());
+  EXPECT_EQ(TASK_RUNNING, event.get().update().status().state());
+
+  AWAIT_READY(update);
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
+}
+
+
+// TODO(benh): Write test for sending Call::Acknowledgement through
+// master to slave when Event::Update was generated locally.
+
+
+class MesosSchedulerDriverTest : public MesosTest {};
 
 
-TEST_F(SchedulerTest, MetricsEndpoint)
+TEST_F(MesosSchedulerDriverTest, MetricsEndpoint)
 {
   Try<PID<Master> > master = StartMaster();
   ASSERT_SOME(master);


[2/2] git commit: Added a concurrent 'Queue' to libprocess.

Posted by be...@apache.org.
Added a concurrent 'Queue' to libprocess.

Review: https://reviews.apache.org/r/20308


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c7adb9d7
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c7adb9d7
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c7adb9d7

Branch: refs/heads/master
Commit: c7adb9d76ae4a667d74873690ab5b1a3fd3b1ad6
Parents: b4ec4eb
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sat Apr 12 19:17:57 2014 -0600
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Tue Jun 3 15:47:20 2014 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/Makefile.am               |  2 +
 3rdparty/libprocess/include/process/queue.hpp | 87 ++++++++++++++++++++++
 3rdparty/libprocess/src/tests/queue_tests.cpp | 78 +++++++++++++++++++
 3 files changed, 167 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/c7adb9d7/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index b687068..87cf1ae 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -108,6 +108,7 @@ libprocess_la_SOURCES +=					\
   $(top_srcdir)/include/process/process.hpp			\
   $(top_srcdir)/include/process/profiler.hpp			\
   $(top_srcdir)/include/process/protobuf.hpp			\
+  $(top_srcdir)/include/process/queue.hpp			\
   $(top_srcdir)/include/process/reap.hpp			\
   $(top_srcdir)/include/process/run.hpp				\
   $(top_srcdir)/include/process/sequence.hpp			\
@@ -142,6 +143,7 @@ tests_SOURCES =							\
   src/tests/metrics_tests.cpp					\
   src/tests/owned_tests.cpp					\
   src/tests/process_tests.cpp					\
+  src/tests/queue_tests.cpp					\
   src/tests/reap_tests.cpp					\
   src/tests/sequence_tests.cpp					\
   src/tests/shared_tests.cpp					\

http://git-wip-us.apache.org/repos/asf/mesos/blob/c7adb9d7/3rdparty/libprocess/include/process/queue.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/queue.hpp b/3rdparty/libprocess/include/process/queue.hpp
new file mode 100644
index 0000000..548c007
--- /dev/null
+++ b/3rdparty/libprocess/include/process/queue.hpp
@@ -0,0 +1,87 @@
+#ifndef __PROCESS_QUEUE_HPP__
+#define __PROCESS_QUEUE_HPP__
+
+#include <deque>
+#include <queue>
+
+#include <process/future.hpp>
+#include <process/internal.hpp>
+#include <process/owned.hpp>
+
+#include <stout/memory.hpp>
+
+namespace process {
+
+template <typename T>
+class Queue
+{
+public:
+  Queue() : data(new Data()) {}
+
+  void put(const T& t)
+  {
+    Owned<Promise<T> > promise;
+
+    internal::acquire(&data->lock);
+    {
+      if (data->promises.empty()) {
+        data->elements.push(t);
+      } else {
+        promise = data->promises.front();
+        data->promises.pop_front();
+      }
+    }
+    internal::release(&data->lock);
+
+    if (promise.get() != NULL) {
+      promise->set(t);
+    }
+  }
+
+  Future<T> get()
+  {
+    Future<T> future;
+
+    internal::acquire(&data->lock);
+    {
+      if (data->elements.empty()) {
+        data->promises.push_back(Owned<Promise<T> >(new Promise<T>()));
+        future = data->promises.back()->future();
+      } else {
+        future = Future<T>(data->elements.front());
+        data->elements.pop();
+      }
+    }
+    internal::release(&data->lock);
+
+    return future;
+  }
+
+private:
+  struct Data
+  {
+    Data() : lock(0) {}
+
+    ~Data()
+    {
+      // TODO(benh): Fail promises?
+    }
+
+    // Rather than use a process to serialize access to the queue's
+    // internal data we use a low-level "lock" which we acquire and
+    // release using atomic builtins.
+    int lock;
+
+    // Represents "waiters" for elements from the queue.
+    std::deque<Owned<Promise<T> > > promises;
+
+    // Represents elements already put in the queue.
+    std::queue<T> elements;
+  };
+
+  memory::shared_ptr<Data> data;
+};
+
+} // namespace process {
+
+#endif // __PROCESS_QUEUE_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/c7adb9d7/3rdparty/libprocess/src/tests/queue_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/queue_tests.cpp b/3rdparty/libprocess/src/tests/queue_tests.cpp
new file mode 100644
index 0000000..79741d3
--- /dev/null
+++ b/3rdparty/libprocess/src/tests/queue_tests.cpp
@@ -0,0 +1,78 @@
+#include <string>
+
+#include <gmock/gmock.h>
+
+#include <process/future.hpp>
+#include <process/queue.hpp>
+
+using namespace process;
+
+using std::string;
+
+TEST(Queue, block)
+{
+  Queue<string> q;
+
+  // A 'get' with an empty queue should block.
+  Future<string> get = q.get();
+
+  EXPECT_TRUE(get.isPending());
+
+  // After putting something the 'get' should be completed.
+  q.put("hello world");
+
+  EXPECT_TRUE(get.isReady());
+  EXPECT_EQ("hello world", get.get());
+}
+
+
+TEST(Queue, noblock)
+{
+  Queue<string> q;
+
+  // Doing a 'put' should cause a 'get' to be completed immediately.
+  q.put("world hello");
+
+  Future<string> get = q.get();
+
+  EXPECT_TRUE(get.isReady());
+  EXPECT_EQ("world hello", get.get());
+}
+
+
+TEST(Queue, queue)
+{
+  Queue<string> q;
+
+  // Multiple calls to 'get' should cause blocking until there have
+  // been enough corresponding calls to 'put'.
+  Future<string> get1 = q.get();
+  Future<string> get2 = q.get();
+  Future<string> get3 = q.get();
+
+  EXPECT_TRUE(get1.isPending());
+  EXPECT_TRUE(get2.isPending());
+  EXPECT_TRUE(get3.isPending());
+
+  q.put("hello");
+
+  EXPECT_TRUE(get1.isReady());
+  EXPECT_TRUE(get2.isPending());
+  EXPECT_TRUE(get3.isPending());
+
+  q.put("pretty");
+
+  EXPECT_TRUE(get1.isReady());
+  EXPECT_TRUE(get2.isReady());
+  EXPECT_TRUE(get3.isPending());
+
+  q.put("world");
+
+  EXPECT_TRUE(get1.isReady());
+  EXPECT_TRUE(get2.isReady());
+  EXPECT_TRUE(get3.isReady());
+
+  EXPECT_EQ("hello", get1.get());
+  EXPECT_EQ("pretty", get2.get());
+  EXPECT_EQ("world", get3.get());
+}