You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/06/04 00:47:55 UTC
[1/2] git commit: Introduced a low-level scheduler API.
Repository: mesos
Updated Branches:
refs/heads/master b4ec4eb98 -> 992ee1f65
Introduced a low-level scheduler API.
Review: https://reviews.apache.org/r/20309
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/992ee1f6
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/992ee1f6
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/992ee1f6
Branch: refs/heads/master
Commit: 992ee1f651195e591c38a34d0df0829f5e79497a
Parents: c7adb9d
Author: Benjamin Hindman <be...@gmail.com>
Authored: Mon Mar 17 22:42:05 2014 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Tue Jun 3 15:47:20 2014 -0700
----------------------------------------------------------------------
include/mesos/mesos.proto | 5 +-
include/mesos/scheduler.hpp | 83 ++-
include/mesos/scheduler/scheduler.hpp | 25 +
include/mesos/scheduler/scheduler.proto | 190 +++++++
src/Makefile.am | 48 +-
src/sched/sched.cpp | 5 +-
src/scheduler/scheduler.cpp | 820 +++++++++++++++++++++++++++
src/tests/master_tests.cpp | 17 +-
src/tests/scheduler_tests.cpp | 156 ++++-
9 files changed, 1324 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/992ee1f6/include/mesos/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index 82388e1..62f69d2 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -494,6 +494,8 @@ enum TaskState {
TASK_FAILED = 3; // TERMINAL.
TASK_KILLED = 4; // TERMINAL.
TASK_LOST = 5; // TERMINAL.
+
+ // TODO(benh): TASK_ERROR = 7; // TERMINAL.
}
@@ -506,6 +508,7 @@ message TaskStatus {
optional string message = 4; // Possible message explaining state.
optional bytes data = 3;
optional SlaveID slave_id = 5;
+ optional ExecutorID executor_id = 7; // TODO(benh): Use in master/slave.
optional double timestamp = 6;
}
@@ -629,7 +632,7 @@ message ACL {
}
-/*
+/**
* Collection of ACL.
*
* Each authorization request is evaluated against the ACLs in the order
http://git-wip-us.apache.org/repos/asf/mesos/blob/992ee1f6/include/mesos/scheduler.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/scheduler.hpp b/include/mesos/scheduler.hpp
index 85db111..a17db59 100644
--- a/include/mesos/scheduler.hpp
+++ b/include/mesos/scheduler.hpp
@@ -19,6 +19,14 @@
#ifndef __MESOS_SCHEDULER_HPP__
#define __MESOS_SCHEDULER_HPP__
+#if __cplusplus >= 201103L
+#include <functional>
+#else // __cplusplus >= 201103L
+#include <tr1/functional>
+#endif // __cplusplus >= 201103L
+
+#include <queue>
+
#include <pthread.h>
#include <string>
@@ -26,6 +34,8 @@
#include <mesos/mesos.hpp>
+#include <mesos/scheduler/scheduler.hpp>
+
/**
* Mesos scheduler interface and scheduler driver. A scheduler is used
@@ -41,10 +51,14 @@ namespace mesos {
// A few forward declarations.
class SchedulerDriver;
+namespace scheduler {
+class MesosProcess;
+} // namespace scheduler {
+
namespace internal {
class MasterDetector;
class SchedulerProcess;
-}
+} // namespace internal {
/**
@@ -428,6 +442,73 @@ protected:
internal::MasterDetector* detector;
};
+
+namespace scheduler {
+
+/**
+ * Interface to Mesos for a scheduler. Abstracts master detection
+ * (connection and disconnection) and authentication if some
+ * credentials are provided.
+ *
+ * Expects three callbacks, 'connected', 'disconnected', and
+ * 'received' which will get invoked _serially_ when it's determined
+ * that we've connected, disconnected, or received events from the
+ * master. Note that we drop events while disconnected but it's
+ * possible to receive a batch of events across a
+ * disconnected/connected transition before getting the disconnected
+ * and then connected callback.
+ * TODO(benh): Don't include events in 'received' that occured after a
+ * disconnected/connected transition.
+ **/
+class Mesos
+{
+public:
+ Mesos(const std::string& master,
+#if __cplusplus >= 201103L
+ const std::function<void(void)>& connected,
+ const std::function<void(void)>& disconnected,
+ const std::function<void(const std::queue<Event>&)>& received);
+#else // __cplusplus >= 201103L
+ const std::tr1::function<void(void)>& connected,
+ const std::tr1::function<void(void)>& disconnected,
+ const std::tr1::function<void(const std::queue<Event>&)>& received);
+#endif // __cplusplus >= 201103L
+
+ /**
+ * Same as the above constructor but takes 'credential' as argument.
+ *
+ * The credential will be used for authenticating with the master.
+ *
+ **/
+ Mesos(const std::string& master,
+ const Credential& credential,
+#if __cplusplus >= 201103L
+ const std::function<void(void)>& connected,
+ const std::function<void(void)>& disconnected,
+ const std::function<void(const std::queue<Event>&)>& received);
+#else // __cplusplus >= 201103L
+ const std::tr1::function<void(void)>& connected,
+ const std::tr1::function<void(void)>& disconnected,
+ const std::tr1::function<void(const std::queue<Event>&)>& received);
+#endif // __cplusplus >= 201103L
+
+ virtual ~Mesos();
+
+ /**
+ * Attempts to send a call to the master.
+ *
+ * Some local validation of calls is performed which may generate
+ * events without ever being sent to the master. This includes when
+ * calls are sent but no master is currently detected (i.e., we're
+ * disconnected).
+ */
+ virtual void send(const Call& call);
+
+private:
+ MesosProcess* process;
+};
+
+} // namespace scheduler {
} // namespace mesos {
#endif // __MESOS_SCHEDULER_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/992ee1f6/include/mesos/scheduler/scheduler.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/scheduler/scheduler.hpp b/include/mesos/scheduler/scheduler.hpp
new file mode 100644
index 0000000..7aebebf
--- /dev/null
+++ b/include/mesos/scheduler/scheduler.hpp
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __SCHEDULER_PROTO_HPP__
+#define __SCHEDULER_PROTO_HPP__
+
+// ONLY USEFUL AFTER RUNNING PROTOC.
+#include <scheduler/scheduler.pb.h>
+
+#endif // __SCHEDULER_PROTO_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/992ee1f6/include/mesos/scheduler/scheduler.proto
----------------------------------------------------------------------
diff --git a/include/mesos/scheduler/scheduler.proto b/include/mesos/scheduler/scheduler.proto
new file mode 100644
index 0000000..4deda55
--- /dev/null
+++ b/include/mesos/scheduler/scheduler.proto
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import "mesos.proto";
+
+package mesos.scheduler;
+
+option java_package = "org.apache.mesos.scheduler";
+option java_outer_classname = "Protos";
+
+
+/**
+ * Low-level scheduler event API.
+ *
+ * An event is described using the standard protocol buffer "union"
+ * trick, see https://developers.google.com/protocol-buffers/docs/techniques#union.
+ */
+message Event {
+ // Possible event types, followed by message definitions if
+ // applicable.
+ enum Type {
+ REGISTERED = 1;
+ REREGISTERED = 2;
+ OFFERS = 3;
+ RESCIND = 4;
+ UPDATE = 5;
+ MESSAGE = 6;
+ FAILURE = 7;
+ ERROR = 8;
+ }
+
+ message Registered {
+ required FrameworkID framework_id = 1;
+ required MasterInfo master_info = 2;
+ }
+
+ message Reregistered {
+ required FrameworkID framework_id = 1;
+ required MasterInfo master_info = 2;
+ }
+
+ message Offers {
+ repeated Offer offers = 1;
+ }
+
+ message Rescind {
+ required OfferID offer_id = 1;
+ }
+
+ message Update {
+ required bytes uuid = 1; // TODO(benh): Replace with UpdateID.
+ required TaskStatus status = 2;
+ }
+
+ message Message {
+ required SlaveID slave_id = 1;
+ required ExecutorID executor_id = 2;
+ required bytes data = 3;
+ }
+
+ message Failure {
+ optional SlaveID slave_id = 1;
+
+ // If this was just a failure of an executor on a slave then
+ // 'executor_id' will be set and possibly 'status' (if we were
+ // able to determine the exit status).
+ optional ExecutorID executor_id = 2;
+ optional int32 status = 3;
+ }
+
+ message Error {
+ required string message = 1;
+ }
+
+ // TODO(benh): Add a 'from' or 'sender'.
+
+ // Type of the event, indicates which optional field below should be
+ // present if that type has a nested message definition.
+ required Type type = 1;
+
+ optional Registered registered = 2;
+ optional Reregistered reregistered = 3;
+ optional Offers offers = 4;
+ optional Rescind rescind = 5;
+ optional Update update = 6;
+ optional Message message = 7;
+ optional Failure failure = 8;
+ optional Error error = 9;
+}
+
+
+/**
+ * Low-level scheduler call API.
+ *
+ * Like Event, a Call is described using the standard protocol buffer
+ * "union" trick (see above).
+ */
+message Call {
+ // Possible call types, followed by message definitions if
+ // applicable.
+ enum Type {
+ REGISTER = 1;
+ REREGISTER = 2;
+ UNREGISTER = 3;
+ REQUEST = 4;
+ DECLINE = 5;
+ REVIVE = 6;
+ LAUNCH = 7;
+ KILL = 8;
+ ACKNOWLEDGE = 9;
+ RECONCILE = 10;
+ MESSAGE = 11;
+
+ // TODO(benh): Consider adding an 'ACTIVATE' and 'DEACTIVATE' for
+ // already registered frameworks as a way of stopping offers from
+ // being generated and other events from being sent by the master.
+ // Note that this functionality existed originally to support
+ // SchedulerDriver::abort which was only necessary to handle
+ // exceptions getting thrown from within Scheduler callbacks,
+ // something that is not an issue with the Event/Call API.
+ }
+
+ message Request {
+ repeated mesos.Request requests = 1;
+ }
+
+ message Decline {
+ repeated OfferID offer_ids = 1;
+ optional Filters filters = 2;
+ }
+
+ message Launch {
+ repeated TaskInfo task_infos = 1;
+ repeated OfferID offer_ids = 2;
+ optional Filters filters = 3;
+ }
+
+ message Kill {
+ required TaskID task_id = 1;
+ }
+
+ message Acknowledge {
+ required SlaveID slave_id = 1;
+ required TaskID task_id = 2;
+ required bytes uuid = 3;
+ }
+
+ message Reconcile {
+ // TODO(benh): Send TaskID's instead, see MESOS-1453.
+ repeated TaskStatus statuses = 1;
+ }
+
+ message Message {
+ required SlaveID slave_id = 1;
+ required ExecutorID executor_id = 2;
+ required bytes data = 3;
+ }
+
+ // Identifies who generated this call. Always necessary, but the
+ // only thing that needs to be set for certain calls, e.g.,
+ // REGISTER, REREGISTER, and UNREGISTER.
+ required FrameworkInfo framework_info = 1;
+
+ // Type of the call, indicates which optional field below should be
+ // present if that type has a nested message definition.
+ required Type type = 2;
+
+ optional Request request = 3;
+ optional Decline decline = 4;
+ optional Launch launch = 5;
+ optional Kill kill = 6;
+ optional Acknowledge acknowledge = 7;
+ optional Reconcile reconcile = 8;
+ optional Message message = 9;
+}
http://git-wip-us.apache.org/repos/asf/mesos/blob/992ee1f6/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index ffde59b..c3ecb94 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -110,19 +110,26 @@ endif
MESOS_PROTO = $(top_srcdir)/include/mesos/mesos.proto
CONTAINERIZER_PROTO = \
- $(top_srcdir)/include/mesos/containerizer/containerizer.proto
+ $(top_srcdir)/include/mesos/containerizer/containerizer.proto
+
+SCHEDULER_PROTO = \
+ $(top_srcdir)/include/mesos/scheduler/scheduler.proto
CXX_PROTOS = \
- mesos.pb.cc mesos.pb.h \
- containerizer/containerizer.pb.cc containerizer/containerizer.pb.h
+ mesos.pb.cc \
+ mesos.pb.h \
+ containerizer/containerizer.pb.cc \
+ containerizer/containerizer.pb.h \
+ scheduler/scheduler.pb.cc \
+ scheduler/scheduler.pb.h
JAVA_PROTOS = \
- java/generated/org/apache/mesos/Protos.java \
- java/generated/org/apache/mesos/containerizer/Protos.java
+ java/generated/org/apache/mesos/Protos.java \
+ java/generated/org/apache/mesos/containerizer/Protos.java
PYTHON_PROTOS = \
- python/src/mesos_pb2.py \
- python/src/containerizer_pb2.py
+ python/src/mesos_pb2.py \
+ python/src/containerizer_pb2.py
BUILT_SOURCES += $(CXX_PROTOS) $(JAVA_PROTOS) $(PYTHON_PROTOS)
CLEANFILES += $(CXX_PROTOS) $(JAVA_PROTOS) $(PYTHON_PROTOS)
@@ -156,6 +163,10 @@ containerizer/%.pb.cc containerizer/%.pb.h: $(CONTAINERIZER_PROTO)
$(MKDIR_P) $(@D)
$(PROTOC) $(PROTOCFLAGS) --cpp_out=. $^
+scheduler/%.pb.cc scheduler/%.pb.h: $(SCHEDULER_PROTO)
+ $(MKDIR_P) $(@D)
+ $(PROTOC) $(PROTOCFLAGS) --cpp_out=. $^
+
%.pb.cc %.pb.h: %.proto
$(MKDIR_P) $(@D)
$(PROTOC) $(PROTOCFLAGS) --cpp_out=. $^
@@ -169,6 +180,10 @@ java/generated/org/apache/mesos/containerizer/Protos.java: \
$(MKDIR_P) $(@D)
$(PROTOC) $(PROTOCFLAGS) --java_out=java/generated $^
+java/generated/org/apache/mesos/scheduler/Protos.java: $(SCHEDULER_PROTO)
+ $(MKDIR_P) $(@D)
+ $(PROTOC) $(PROTOCFLAGS) --java_out=java/generated $^
+
python/src/mesos_pb2.py: $(MESOS_PROTO)
$(MKDIR_P) $(@D)
$(PROTOC) $(PROTOCFLAGS) --python_out=python/src $^
@@ -178,6 +193,11 @@ python/src/containerizer_pb2.py: $(CONTAINERIZER_PROTO)
$(PROTOC) -I$(top_srcdir)/include/mesos/containerizer \
$(PROTOCFLAGS) --python_out=python/src $^
+python/src/scheduler_pb2.py: $(SCHEDULER_PROTO)
+ $(MKDIR_P) $(@D)
+ $(PROTOC) -I$(top_srcdir)/include/mesos/scheduler \
+ $(PROTOCFLAGS) --python_out=python/src $^
+
# We even use a convenience library for most of Mesos so that we can
# exclude third party libraries so setuptools/distribute can build a
# self-contained Python library and statically link in the third party
@@ -196,6 +216,7 @@ libmesos_no_3rdparty_la_SOURCES = \
sasl/auxprop.hpp \
sasl/auxprop.cpp \
sched/sched.cpp \
+ scheduler/scheduler.cpp \
local/local.cpp \
master/contender.cpp \
master/constants.cpp \
@@ -253,8 +274,15 @@ containerizer_HEADERS = \
$(top_srcdir)/include/mesos/containerizer/containerizer.hpp \
$(top_srcdir)/include/mesos/containerizer/containerizer.proto
-nodist_containerizer_HEADERS = \
- containerizer/containerizer.pb.h
+nodist_containerizer_HEADERS = containerizer/containerizer.pb.h
+
+schedulerdir = $(pkgincludedir)/scheduler
+
+scheduler_HEADERS = \
+ $(top_srcdir)/include/mesos/scheduler/scheduler.hpp \
+ $(top_srcdir)/include/mesos/scheduler/scheduler.proto
+
+nodist_scheduler_HEADERS = scheduler/scheduler.pb.h
if OS_LINUX
libmesos_no_3rdparty_la_SOURCES += linux/cgroups.cpp
@@ -453,7 +481,7 @@ libmesos_no_3rdparty_la_LIBADD += libstate.la
lib_LTLIBRARIES += libmesos.la
# Include as part of the distribution.
-libmesos_la_SOURCES = $(MESOS_PROTO) $(CONTAINERIZER_PROTO)
+libmesos_la_SOURCES = $(MESOS_PROTO) $(CONTAINERIZER_PROTO) $(SCHEDULER_PROTO)
libmesos_la_LDFLAGS = -release $(PACKAGE_VERSION) -shared
http://git-wip-us.apache.org/repos/asf/mesos/blob/992ee1f6/src/sched/sched.cpp
----------------------------------------------------------------------
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index be23e01..b27222e 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -56,6 +56,7 @@
#include <stout/option.hpp>
#include <stout/os.hpp>
#include <stout/stopwatch.hpp>
+#include <stout/utils.hpp>
#include <stout/uuid.hpp>
#include "sasl/authenticatee.hpp"
@@ -86,6 +87,7 @@ using std::vector;
using process::wait; // Necessary on some OS's to disambiguate.
+using utils::copy;
namespace mesos {
namespace internal {
@@ -264,8 +266,7 @@ protected:
// are here, making the 'discard' here a no-op. This is ok
// because we set 'reauthenticate' here which enforces a retry
// in '_authenticate'.
- Future<bool> authenticating_ = authenticating.get();
- authenticating_.discard();
+ copy(authenticating.get()).discard();
reauthenticate = true;
return;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/992ee1f6/src/scheduler/scheduler.cpp
----------------------------------------------------------------------
diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp
new file mode 100644
index 0000000..9e8975e
--- /dev/null
+++ b/src/scheduler/scheduler.cpp
@@ -0,0 +1,820 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <dlfcn.h>
+#include <errno.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+
+#include <arpa/inet.h>
+
+#include <iostream>
+#include <string>
+#include <sstream>
+
+#include <mesos/mesos.hpp>
+#include <mesos/scheduler.hpp>
+
+#include <process/async.hpp>
+#include <process/defer.hpp>
+#include <process/delay.hpp>
+#include <process/dispatch.hpp>
+#include <process/future.hpp>
+#include <process/id.hpp>
+#include <process/mutex.hpp>
+#include <process/process.hpp>
+#include <process/protobuf.hpp>
+
+#include <stout/check.hpp>
+#include <stout/duration.hpp>
+#include <stout/error.hpp>
+#include <stout/flags.hpp>
+#include <stout/lambda.hpp>
+#include <stout/nothing.hpp>
+#include <stout/option.hpp>
+#include <stout/os.hpp>
+#include <stout/uuid.hpp>
+
+#include "sasl/authenticatee.hpp"
+
+#include "common/type_utils.hpp"
+
+#include "master/detector.hpp"
+
+#include "local/local.hpp"
+
+#include "master/detector.hpp"
+
+#include "logging/flags.hpp"
+#include "logging/logging.hpp"
+
+#include "messages/messages.hpp"
+
+using namespace mesos;
+using namespace mesos::internal;
+using namespace mesos::internal::master;
+
+using namespace process;
+
+using std::queue;
+using std::string;
+using std::vector;
+
+using process::wait; // Necessary on some OS's to disambiguate.
+
+namespace mesos {
+namespace scheduler {
+
+// The process (below) is responsible for receiving messages
+// (eventually events) from the master and sending messages (via
+// calls) to the master.
+class MesosProcess : public ProtobufProcess<MesosProcess>
+{
+public:
+ MesosProcess(
+ const string& master,
+ const Option<Credential>& _credential,
+ const lambda::function<void(void)>& _connected,
+ const lambda::function<void(void)>& _disconnected,
+ lambda::function<void(const queue<Event>&)> _received)
+ : ProcessBase(ID::generate("scheduler")),
+ credential(_credential),
+ connected(_connected),
+ disconnected(_disconnected),
+ received(_received),
+ local(false),
+ failover(true),
+ detector(NULL),
+ authenticatee(NULL),
+ authenticating(None()),
+ authenticated(false),
+ reauthenticate(false)
+ {
+ GOOGLE_PROTOBUF_VERIFY_VERSION;
+
+ // Load any flags from the environment (we use local::Flags in the
+ // event we run in 'local' mode, since it inherits
+ // logging::Flags). In the future, just as the TODO in
+ // local/main.cpp discusses, we'll probably want a way to load
+ // master::Flags and slave::Flags as well.
+ local::Flags flags;
+
+ Try<Nothing> load = flags.load("MESOS_");
+
+ if (load.isError()) {
+ error("Failed to load flags: " + load.error());
+ return;
+ }
+
+ // Initialize libprocess (done here since at some point we might
+ // want to use flags to initialize libprocess).
+ process::initialize();
+
+ logging::initialize("mesos", flags);
+
+ LOG(INFO) << "Version: " << MESOS_VERSION;
+
+ // Launch a local cluster if necessary.
+ Option<UPID> pid = None();
+ if (master == "local") {
+ pid = local::launch(flags);
+ local = true;
+ }
+
+ Try<MasterDetector*> detector_ =
+ MasterDetector::create(pid.isSome() ? string(pid.get()) : master);
+
+ if (detector_.isError()) {
+ error("Failed to create a master detector:" + detector_.error());
+ return;
+ }
+
+ // Save the detector so we can delete it later.
+ detector = detector_.get();
+ }
+
+ virtual ~MesosProcess()
+ {
+ delete authenticatee;
+
+ // Check and see if we need to shutdown a local cluster.
+ if (local) {
+ local::shutdown();
+ }
+
+ // Wait for any callbacks to finish.
+ mutex.lock().await();
+ }
+
+ // TODO(benh): Move this to 'protected'.
+ using ProtobufProcess<MesosProcess>::send;
+
+ void send(Call call)
+ {
+ if (master.isNone()) {
+ drop(call, "Disconnected");
+ return;
+ }
+
+ // If no user was specified in FrameworkInfo, use the current user.
+ // TODO(benh): Make FrameworkInfo.user be optional and add a
+ // 'user' to either TaskInfo or CommandInfo.
+ if (call.framework_info().user() == "") {
+ call.mutable_framework_info()->set_user(os::user());
+ }
+
+ // Only a REGISTER should not have set the framework ID.
+ if (call.type() != Call::REGISTER && !call.framework_info().has_id()) {
+ drop(call, "Call is mising FrameworkInfo.id");
+ return;
+ }
+
+ if (!call.IsInitialized()) {
+ drop(call, "Call is not properly initialized: " +
+ call.InitializationErrorString());
+ return;
+ }
+
+ switch (call.type()) {
+ case Call::REGISTER: {
+ RegisterFrameworkMessage message;
+ message.mutable_framework()->CopyFrom(call.framework_info());
+ send(master.get(), message);
+ break;
+ }
+
+ case Call::REREGISTER: {
+ ReregisterFrameworkMessage message;
+ message.mutable_framework()->CopyFrom(call.framework_info());
+ message.set_failover(failover);
+ send(master.get(), message);
+ break;
+ }
+
+ case Call::UNREGISTER: {
+ UnregisterFrameworkMessage message;
+ message.mutable_framework_id()->CopyFrom(call.framework_info().id());
+ send(master.get(), message);
+ break;
+ }
+
+ case Call::REQUEST: {
+ ResourceRequestMessage message;
+ message.mutable_framework_id()->CopyFrom(call.framework_info().id());
+ message.mutable_requests()->CopyFrom(call.request().requests());
+ send(master.get(), message);
+ break;
+ }
+
+ case Call::DECLINE: {
+ LaunchTasksMessage message;
+ message.mutable_framework_id()->CopyFrom(call.framework_info().id());
+ message.mutable_filters()->CopyFrom(call.decline().filters());
+ message.mutable_offer_ids()->CopyFrom(call.decline().offer_ids());
+ send(master.get(), message);
+ break;
+ }
+
+ case Call::REVIVE: {
+ ReviveOffersMessage message;
+ message.mutable_framework_id()->CopyFrom(call.framework_info().id());
+ send(master.get(), message);
+ break;
+ }
+
+ case Call::LAUNCH: {
+ // We do some local validation here, but really this should
+ // all happen in the master so it's only implemented once.
+ vector<TaskInfo> tasks;
+
+ foreach (const TaskInfo& task, call.launch().task_infos()) {
+ // Check that each TaskInfo has either an ExecutorInfo or a
+ // CommandInfo but not both.
+ if (task.has_executor() == task.has_command()) {
+ drop(task,
+ "TaskInfo must have either an 'executor' or a 'command'");
+ continue;
+ }
+
+ // Ensure ExecutorInfo::framework_id is valid, if present.
+ if (task.has_executor() &&
+ task.executor().has_framework_id() &&
+ !(task.executor().framework_id() == call.framework_info().id())) {
+ drop(task,
+ "ExecutorInfo has an invalid FrameworkID (Actual: " +
+ stringify(task.executor().framework_id()) + " vs Expected: " +
+ stringify(call.framework_info().id()) + ")");
+ continue;
+ }
+
+ tasks.push_back(task);
+
+ // Set ExecutorInfo::framework_id if missing since this
+ // field was added to the API later and thus was made
+ // optional.
+ if (task.has_executor() && !task.executor().has_framework_id()) {
+ tasks.back().mutable_executor()->mutable_framework_id()->CopyFrom(
+ call.framework_info().id());
+ }
+ }
+
+ LaunchTasksMessage message;
+ message.mutable_framework_id()->CopyFrom(call.framework_info().id());
+ message.mutable_filters()->CopyFrom(call.launch().filters());
+ message.mutable_offer_ids()->CopyFrom(call.launch().offer_ids());
+
+ foreach (const TaskInfo& task, tasks) {
+ message.add_tasks()->CopyFrom(task);
+ }
+
+ send(master.get(), message);
+ break;
+ }
+
+ case Call::KILL: {
+ KillTaskMessage message;
+ message.mutable_framework_id()->CopyFrom(call.framework_info().id());
+ message.mutable_task_id()->CopyFrom(call.kill().task_id());
+ send(master.get(), message);
+ break;
+ }
+
+ case Call::ACKNOWLEDGE: {
+ StatusUpdateAcknowledgementMessage message;
+ message.mutable_framework_id()->CopyFrom(call.framework_info().id());
+ message.mutable_slave_id()->CopyFrom(call.acknowledge().slave_id());
+ message.mutable_task_id()->CopyFrom(call.acknowledge().task_id());
+ message.set_uuid(call.acknowledge().uuid());
+ send(master.get(), message);
+ break;
+ }
+
+ case Call::RECONCILE: {
+ ReconcileTasksMessage message;
+ message.mutable_framework_id()->CopyFrom(call.framework_info().id());
+ message.mutable_statuses()->CopyFrom(call.reconcile().statuses());
+ send(master.get(), message);
+ break;
+ }
+
+ case Call::MESSAGE: {
+ FrameworkToExecutorMessage message;
+ message.mutable_slave_id()->CopyFrom(call.message().slave_id());
+ message.mutable_framework_id()->CopyFrom(call.framework_info().id());
+ message.mutable_executor_id()->CopyFrom(call.message().executor_id());
+ message.set_data(call.message().data());
+ send(master.get(), message);
+ break;
+ }
+
+ default:
+ VLOG(1) << "Unexpected call " << stringify(call.type());
+ break;
+ }
+ }
+
+protected:
+ virtual void initialize()
+ {
+ install<FrameworkRegisteredMessage>(&MesosProcess::receive);
+ install<FrameworkReregisteredMessage>(&MesosProcess::receive);
+ install<ResourceOffersMessage>(&MesosProcess::receive);
+ install<RescindResourceOfferMessage>(&MesosProcess::receive);
+ install<StatusUpdateMessage>(&MesosProcess::receive);
+ install<LostSlaveMessage>(&MesosProcess::receive);
+ install<ExecutorToFrameworkMessage>(&MesosProcess::receive);
+ install<FrameworkErrorMessage>(&MesosProcess::receive);
+
+ // Start detecting masters.
+ detector->detect()
+ .onAny(defer(self(), &MesosProcess::detected, lambda::_1));
+ }
+
+ void detected(const Future<Option<MasterInfo> >& future)
+ {
+ CHECK(!future.isDiscarded());
+
+ if (future.isFailed()) {
+ error("Failed to detect a master: " + future.failure());
+ return;
+ }
+
+ if (future.get().isNone()) {
+ master = None();
+
+ VLOG(1) << "No master detected";
+
+ mutex.lock()
+ .then(defer(self(), &Self::_detected))
+ .onAny(lambda::bind(&Mutex::unlock, mutex));
+ } else {
+ master = UPID(future.get().get().pid());
+
+ VLOG(1) << "New master detected at " << master.get();
+
+ if (credential.isSome()) {
+ // Authenticate with the master.
+ authenticate();
+ } else {
+ mutex.lock()
+ .then(defer(self(), &Self::__detected))
+ .onAny(lambda::bind(&Mutex::unlock, mutex));
+ }
+ }
+
+ // Keep detecting masters.
+ detector->detect(future.get())
+ .onAny(defer(self(), &MesosProcess::detected, lambda::_1));
+ }
+
+ Future<Nothing> _detected()
+ {
+ return async(disconnected);
+ }
+
+ Future<Nothing> __detected()
+ {
+ return async(connected);
+ }
+
+ void authenticate()
+ {
+ authenticated = false;
+
+ // We retry to authenticate and it's possible that we'll get
+ // disconnected while that is happening.
+ if (master.isNone()) {
+ return;
+ }
+
+ if (authenticating.isSome()) {
+ // Authentication is in progress. Try to cancel it.
+ // Note that it is possible that 'authenticating' is ready
+ // and the dispatch to '_authenticate' is enqueued when we
+ // are here, making the 'discard' here a no-op. This is ok
+ // because we set 'reauthenticate' here which enforces a retry
+ // in '_authenticate'.
+ Future<bool>(authenticating.get()).discard();
+ reauthenticate = true;
+ return;
+ }
+
+ VLOG(1) << "Authenticating with master " << master.get();
+
+ CHECK_SOME(credential);
+
+ CHECK(authenticatee == NULL);
+ authenticatee = new sasl::Authenticatee(credential.get(), self());
+
+ // NOTE: We do not pass 'Owned<Authenticatee>' here because doing
+ // so could make 'AuthenticateeProcess' responsible for deleting
+ // 'Authenticatee' causing a deadlock because the destructor of
+ // 'Authenticatee' waits on 'AuthenticateeProcess'.
+ // This will happen in the following scenario:
+ // --> 'AuthenticateeProcess' does a 'Future.set()'.
+ // --> '_authenticate()' is dispatched to this process.
+ // --> This process executes '_authenticatee()'.
+ // --> 'AuthenticateeProcess' removes the onAny callback
+ // from its queue which holds the last reference to
+ // 'Authenticatee'.
+ // --> '~Authenticatee()' is invoked by 'AuthenticateeProcess'.
+ // TODO(vinod): Consider using 'Shared' to 'Owned' upgrade.
+ authenticating = authenticatee->authenticate(master.get())
+ .onAny(defer(self(), &Self::_authenticate));
+
+ delay(Seconds(5),
+ self(),
+ &Self::authenticationTimeout,
+ authenticating.get());
+ }
+
+ void _authenticate()
+ {
+ delete CHECK_NOTNULL(authenticatee);
+ authenticatee = NULL;
+
+ CHECK_SOME(authenticating);
+ const Future<bool>& future = authenticating.get();
+
+ if (master.isNone()) {
+ VLOG(1) << "Ignoring authentication because no master is detected";
+ authenticating = None();
+
+ // Set it to false because we do not want further retries until
+ // a new master is detected.
+ // We obviously do not need to reauthenticate either even if
+ // 'reauthenticate' is currently true because the master is
+ // lost.
+ reauthenticate = false;
+ return;
+ }
+
+ if (reauthenticate || !future.isReady()) {
+ VLOG(1)
+ << "Failed to authenticate with master " << master.get() << ": "
+ << (reauthenticate ? "master changed" :
+ (future.isFailed() ? future.failure() : "future discarded"));
+
+ authenticating = None();
+ reauthenticate = false;
+
+ // TODO(vinod): Add a limit on number of retries.
+ dispatch(self(), &Self::authenticate); // Retry.
+ return;
+ }
+
+ if (!future.get()) {
+ VLOG(1) << "Master " << master.get() << " refused authentication";
+ error("Authentication refused");
+ return;
+ }
+
+ VLOG(1) << "Successfully authenticated with master " << master.get();
+
+ authenticated = true;
+ authenticating = None();
+
+ mutex.lock()
+ .then(defer(self(), &Self::__authenticate))
+ .onAny(lambda::bind(&Mutex::unlock, mutex));
+ }
+
+ Future<Nothing> __authenticate()
+ {
+ return async(connected);
+ }
+
+ void authenticationTimeout(Future<bool> future)
+ {
+ // NOTE: Discarded future results in a retry in '_authenticate()'.
+ // Also note that a 'discard' here is safe even if another
+ // authenticator is in progress because this copy of the future
+ // corresponds to the original authenticator that started the timer.
+ if (future.discard()) { // This is a no-op if the future is already ready.
+ LOG(WARNING) << "Authentication timed out";
+ }
+ }
+
+ // NOTE: A None 'from' is possible when an event is injected locally.
+ void receive(const Option<UPID>& from, const Event& event)
+ {
+ // Check if we're disconnected but received an event.
+ if (from.isSome() && master.isNone()) {
+ // Ignore the event unless it's a registered or reregistered.
+ if (event.type() != Event::REGISTERED &&
+ event.type() != Event::REREGISTERED) {
+ VLOG(1) << "Ignoring " << stringify(event.type())
+ << " event because we're disconnected";
+ return;
+ }
+ } else if (from.isSome() && master != from) {
+ VLOG(1)
+ << "Ignoring " << stringify(event.type())
+ << " event because it was sent from '" << from.get()
+ << "' instead of the leading master '" << master.get() << "'";
+ return;
+ }
+
+ // Note that if 'from' is None we're locally injecting this event
+ // so we always want to enqueue it even if we're not connected!
+
+ VLOG(1) << "Enqueuing event " << stringify(event.type()) << " from "
+ << (from.isNone() ? "(locally injected)" : from.get());
+
+ // Queue up the event and invoke the 'received' callback if this
+ // is the first event (between now and when the 'received'
+ // callback actually gets invoked more events might get queued).
+ events.push(event);
+
+ if (events.size() == 1) {
+ mutex.lock()
+ .then(defer(self(), &Self::_receive))
+ .onAny(lambda::bind(&Mutex::unlock, mutex));
+ }
+ }
+
+ Future<Nothing> _receive()
+ {
+ Future<Nothing> future = async(received, events);
+ events = queue<Event>();
+ return future;
+ }
+
+ void receive(const UPID& from, const FrameworkRegisteredMessage& message)
+ {
+ // We've now registered at least once with the master so we're no
+ // longer failing over. See the comment where 'failover' is
+ // declared for further details.
+ failover = false;
+
+ Event event;
+ event.set_type(Event::REGISTERED);
+
+ Event::Registered* registered = event.mutable_registered();
+
+ registered->mutable_framework_id()->CopyFrom(message.framework_id());
+ registered->mutable_master_info()->CopyFrom(message.master_info());
+
+ receive(from, event);
+ }
+
+ void receive(const UPID& from, const FrameworkReregisteredMessage& message)
+ {
+ // We've now registered at least once with the master so we're no
+ // longer failing over. See the comment where 'failover' is
+ // declared for further details.
+ failover = false;
+
+ Event event;
+ event.set_type(Event::REREGISTERED);
+
+ Event::Reregistered* reregistered = event.mutable_reregistered();
+
+ reregistered->mutable_framework_id()->CopyFrom(message.framework_id());
+ reregistered->mutable_master_info()->CopyFrom(message.master_info());
+
+ receive(from, event);
+ }
+
+ void receive(const UPID& from, const ResourceOffersMessage& message)
+ {
+ Event event;
+ event.set_type(Event::OFFERS);
+
+ Event::Offers* offers = event.mutable_offers();
+
+ offers->mutable_offers()->CopyFrom(message.offers());
+
+ receive(from, event);
+ }
+
+ void receive(const UPID& from, const RescindResourceOfferMessage& message)
+ {
+ Event event;
+ event.set_type(Event::RESCIND);
+
+ Event::Rescind* rescind = event.mutable_rescind();
+
+ rescind->mutable_offer_id()->CopyFrom(message.offer_id());
+
+ receive(from, event);
+ }
+
+ void receive(const UPID& from, const StatusUpdateMessage& message)
+ {
+ Event event;
+ event.set_type(Event::UPDATE);
+
+ Event::Update* update = event.mutable_update();
+
+ update->mutable_status()->CopyFrom(message.update().status());
+
+ if (message.update().has_slave_id()) {
+ update->mutable_status()->mutable_slave_id()->CopyFrom(
+ message.update().slave_id());
+ }
+
+ if (message.update().has_executor_id()) {
+ update->mutable_status()->mutable_executor_id()->CopyFrom(
+ message.update().executor_id());
+ }
+
+ update->mutable_status()->set_timestamp(message.update().timestamp());
+
+ update->set_uuid(message.update().uuid());
+
+ receive(from, event);
+ }
+
+ void receive(const UPID& from, const LostSlaveMessage& message)
+ {
+ Event event;
+ event.set_type(Event::FAILURE);
+
+ Event::Failure* failure = event.mutable_failure();
+
+ failure->mutable_slave_id()->CopyFrom(message.slave_id());
+
+ receive(from, event);
+ }
+
+ void receive(const UPID& from, const ExecutorToFrameworkMessage& _message)
+ {
+ Event event;
+ event.set_type(Event::MESSAGE);
+
+ Event::Message* message = event.mutable_message();
+
+ message->mutable_slave_id()->CopyFrom(_message.slave_id());
+ message->mutable_executor_id()->CopyFrom(_message.executor_id());
+ message->set_data(_message.data());
+
+ receive(from, event);
+ }
+
+ void receive(const UPID& from, const FrameworkErrorMessage& message)
+ {
+ Event event;
+ event.set_type(Event::ERROR);
+
+ Event::Error* error = event.mutable_error();
+
+ error->set_message(message.message());
+
+ receive(from, event);
+ }
+
+ // Helper for injecting an ERROR event.
+ void error(const string& message)
+ {
+ Event event;
+
+ event.set_type(Event::ERROR);
+
+ Event::Error* error = event.mutable_error();
+
+ error->set_message(message);
+
+ receive(None(), event);
+ }
+
+ // Helper for "dropping" a task that was launched.
+ void drop(const TaskInfo& task, const string& message)
+ {
+ Event event;
+
+ event.set_type(Event::UPDATE);
+
+ Event::Update* update = event.mutable_update();
+
+ TaskStatus* status = update->mutable_status();
+ status->mutable_task_id()->CopyFrom(task.task_id());
+ status->set_state(TASK_LOST);
+ status->set_message(message);
+ status->set_timestamp(Clock::now().secs());
+
+ update->set_uuid(UUID::random().toBytes());
+
+ receive(None(), event);
+ }
+
+ void drop(const Call& call, const string& message)
+ {
+ VLOG(1) << "Dropping " << stringify(call.type()) << ": " << message;
+
+ switch (call.type()) {
+ case Call::LAUNCH: {
+ // We drop the tasks preemptively (enqueing update events that
+ // put the task in TASK_LOST). This is a hack for now, to keep
+ // the tasks from being forever in PENDING state, when
+ // actually the master never received the launch.
+ // Unfortuantely this is insufficient since it doesn't capture
+ // the case when the scheduler process sends it but the master
+ // never receives it (i.e., during a master failover). In the
+ // future, this should be solved by higher-level abstractions
+ // and this hack should be considered for removal.
+ foreach (const TaskInfo& task, call.launch().task_infos()) {
+ drop(task, message);
+ }
+ break;
+ }
+
+ default:
+ break;
+ }
+ }
+
+private:
+ const Option<Credential> credential;
+
+ Mutex mutex; // Used to serialize the callback invocations.
+
+ lambda::function<void(void)> connected;
+ lambda::function<void(void)> disconnected;
+ lambda::function<void(const queue<Event>&)> received;
+
+ bool local; // Whether or not we launched a local cluster.
+
+ // Whether or not this is the first time we've sent a
+ // REREGISTER. This is to maintain compatibility with what the
+ // master expects from SchedulerProcess. After the first REGISTER or
+ // REREGISTER event we force this to be false.
+ bool failover;
+
+ MasterDetector* detector;
+
+ queue<Event> events;
+
+ Option<UPID> master;
+
+ sasl::Authenticatee* authenticatee;
+
+ // Indicates if an authentication attempt is in progress.
+ Option<Future<bool> > authenticating;
+
+ // Indicates if the authentication is successful.
+ bool authenticated;
+
+ // Indicates if a new authentication attempt should be enforced.
+ bool reauthenticate;
+};
+
+
+Mesos::Mesos(
+ const string& master,
+ const lambda::function<void(void)>& connected,
+ const lambda::function<void(void)>& disconnected,
+ const lambda::function<void(const queue<Event>&)>& received)
+{
+ process =
+ new MesosProcess(master, None(), connected, disconnected, received);
+ spawn(process);
+}
+
+
+Mesos::Mesos(
+ const string& master,
+ const Credential& credential,
+ const lambda::function<void(void)>& connected,
+ const lambda::function<void(void)>& disconnected,
+ const lambda::function<void(const queue<Event>&)>& received)
+{
+ process =
+ new MesosProcess(master, credential, connected, disconnected, received);
+ spawn(process);
+}
+
+
+Mesos::~Mesos()
+{
+ terminate(process);
+ wait(process);
+ delete process;
+}
+
+
+void Mesos::send(const Call& call)
+{
+ dispatch(process, &MesosProcess::send, call);
+}
+
+
+} // namespace scheduler {
+} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/992ee1f6/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index 7183cb7..34df121 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -46,11 +46,12 @@
#include "master/master.hpp"
#include "slave/constants.hpp"
-#include "slave/containerizer/mesos_containerizer.hpp"
#include "slave/gc.hpp"
#include "slave/flags.hpp"
#include "slave/slave.hpp"
+#include "slave/containerizer/mesos_containerizer.hpp"
+
#include "tests/containerizer.hpp"
#include "tests/mesos.hpp"
@@ -134,12 +135,11 @@ TEST_F(MasterTest, TaskRunning)
EXPECT_CALL(exec, launchTask(_, _))
.WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
- Future<Nothing> resourcesUpdated;
Future<Nothing> update;
EXPECT_CALL(containerizer,
update(_, Resources(offers.get()[0].resources())))
- .WillOnce(DoAll(FutureSatisfy(&resourcesUpdated),
- Return(update)));
+ .WillOnce(DoAll(FutureSatisfy(&update),
+ Return(Future<Nothing>())));
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(&driver, _))
@@ -150,7 +150,7 @@ TEST_F(MasterTest, TaskRunning)
AWAIT_READY(status);
EXPECT_EQ(TASK_RUNNING, status.get().state());
- AWAIT_READY(resourcesUpdated);
+ AWAIT_READY(update);
EXPECT_CALL(exec, shutdown(_))
.Times(AtMost(1));
@@ -210,12 +210,11 @@ TEST_F(MasterTest, ShutdownFrameworkWhileTaskRunning)
EXPECT_CALL(exec, launchTask(_, _))
.WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
- Future<Nothing> resourcesUpdated;
Future<Nothing> update;
EXPECT_CALL(containerizer,
update(_, Resources(offers.get()[0].resources())))
- .WillOnce(DoAll(FutureSatisfy(&resourcesUpdated),
- Return(update)));
+ .WillOnce(DoAll(FutureSatisfy(&update),
+ Return(Future<Nothing>())));
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(&driver, _))
@@ -226,7 +225,7 @@ TEST_F(MasterTest, ShutdownFrameworkWhileTaskRunning)
AWAIT_READY(status);
EXPECT_EQ(TASK_RUNNING, status.get().state());
- AWAIT_READY(resourcesUpdated);
+ AWAIT_READY(update);
EXPECT_CALL(exec, shutdown(_))
.Times(AtMost(1));
http://git-wip-us.apache.org/repos/asf/mesos/blob/992ee1f6/src/tests/scheduler_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/scheduler_tests.cpp b/src/tests/scheduler_tests.cpp
index bd3c22d..d87bf78 100644
--- a/src/tests/scheduler_tests.cpp
+++ b/src/tests/scheduler_tests.cpp
@@ -18,22 +18,30 @@
#include <gmock/gmock.h>
+#include <queue>
#include <vector>
+#include <mesos/executor.hpp>
#include <mesos/scheduler.hpp>
#include <process/future.hpp>
+#include <process/gmock.hpp>
#include <process/gtest.hpp>
#include <process/http.hpp>
+#include <process/owned.hpp>
#include <process/pid.hpp>
+#include <process/queue.hpp>
#include <process/metrics/metrics.hpp>
#include <stout/json.hpp>
+#include <stout/lambda.hpp>
+#include <stout/memory.hpp>
#include <stout/try.hpp>
#include "master/master.hpp"
+#include "tests/containerizer.hpp"
#include "tests/mesos.hpp"
using namespace mesos;
@@ -42,23 +50,167 @@ using namespace mesos::internal::tests;
using mesos::internal::master::Master;
+using mesos::internal::slave::Containerizer;
+using mesos::internal::slave::Slave;
+
+using mesos::scheduler::Call;
+using mesos::scheduler::Event;
+
using process::Future;
+using process::Owned;
using process::PID;
+using process::Promise;
+using process::Queue;
using process::http::OK;
using process::metrics::internal::MetricsProcess;
+using std::queue;
using std::vector;
using testing::_;
+using testing::AtMost;
+using testing::DoAll;
using testing::Return;
-class SchedulerTest : public MesosTest {};
+class SchedulerTest : public MesosTest
+{
+protected:
+ // Helper class for using EXPECT_CALL since the Mesos scheduler API
+ // is callback based.
+ class Callbacks
+ {
+ public:
+ MOCK_METHOD0(connected, void(void));
+ MOCK_METHOD0(disconnected, void(void));
+ MOCK_METHOD1(received, void(const std::queue<Event>&));
+ };
+
+};
+
+
+// Enqueues all received events into a libprocess queue.
+ACTION_P(Enqueue, queue)
+{
+ std::queue<Event> events = arg0;
+ while (!events.empty()) {
+ queue->put(events.front());
+ events.pop();
+ }
+}
+
+
+TEST_F(SchedulerTest, TaskRunning)
+{
+ Try<PID<Master> > master = StartMaster();
+ ASSERT_SOME(master);
+
+ MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+ TestContainerizer containerizer(&exec);
+
+ Try<PID<Slave> > slave = StartSlave(&containerizer);
+ ASSERT_SOME(slave);
+
+ Callbacks callbacks;
+
+ Future<Nothing> connected;
+ EXPECT_CALL(callbacks, connected())
+ .WillOnce(FutureSatisfy(&connected));
+
+ scheduler::Mesos mesos(
+ master.get(),
+ DEFAULT_CREDENTIAL,
+ lambda::bind(&Callbacks::connected, lambda::ref(callbacks)),
+ lambda::bind(&Callbacks::disconnected, lambda::ref(callbacks)),
+ lambda::bind(&Callbacks::received, lambda::ref(callbacks), lambda::_1));
+
+ AWAIT_READY(connected);
+
+ Queue<Event> events;
+
+ EXPECT_CALL(callbacks, received(_))
+ .WillRepeatedly(Enqueue(&events));
+
+ {
+ Call call;
+ call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
+ call.set_type(Call::REGISTER);
+
+ mesos.send(call);
+ }
+
+ Future<Event> event = events.get();
+ AWAIT_READY(event);
+ EXPECT_EQ(Event::REGISTERED, event.get().type());
+
+ FrameworkID id(event.get().registered().framework_id());
+
+ event = events.get();
+ AWAIT_READY(event);
+ EXPECT_EQ(Event::OFFERS, event.get().type());
+ EXPECT_NE(0u, event.get().offers().offers().size());
+
+ EXPECT_CALL(exec, registered(_, _, _, _))
+ .Times(1);
+
+ EXPECT_CALL(exec, launchTask(_, _))
+ .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+ Future<Nothing> update;
+ EXPECT_CALL(containerizer, update(_, _))
+ .WillOnce(DoAll(FutureSatisfy(&update),
+ Return(Future<Nothing>())))
+ .WillRepeatedly(Return(Future<Nothing>())); // Ignore subsequent calls.
+
+ TaskInfo taskInfo;
+ taskInfo.set_name("");
+ taskInfo.mutable_task_id()->set_value("1");
+ taskInfo.mutable_slave_id()->CopyFrom(
+ event.get().offers().offers(0).slave_id());
+ taskInfo.mutable_resources()->CopyFrom(
+ event.get().offers().offers(0).resources());
+ taskInfo.mutable_executor()->CopyFrom(DEFAULT_EXECUTOR_INFO);
+
+ // TODO(benh): Enable just running a task with a command in the tests:
+ // taskInfo.mutable_command()->set_value("sleep 10");
+
+ {
+ Call call;
+ call.mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
+ call.mutable_framework_info()->mutable_id()->CopyFrom(id);
+ call.set_type(Call::LAUNCH);
+ call.mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
+ call.mutable_launch()->add_offer_ids()->CopyFrom(
+ event.get().offers().offers(0).id());
+
+ mesos.send(call);
+ }
+
+ event = events.get();
+ AWAIT_READY(event);
+ EXPECT_EQ(Event::UPDATE, event.get().type());
+ EXPECT_EQ(TASK_RUNNING, event.get().update().status().state());
+
+ AWAIT_READY(update);
+
+ EXPECT_CALL(exec, shutdown(_))
+ .Times(AtMost(1));
+
+ Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
+}
+
+
+// TODO(benh): Write test for sending Call::Acknowledgement through
+// master to slave when Event::Update was generated locally.
+
+
+class MesosSchedulerDriverTest : public MesosTest {};
-TEST_F(SchedulerTest, MetricsEndpoint)
+TEST_F(MesosSchedulerDriverTest, MetricsEndpoint)
{
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
[2/2] git commit: Added a concurrent 'Queue' to libprocess.
Posted by be...@apache.org.
Added a concurrent 'Queue' to libprocess.
Review: https://reviews.apache.org/r/20308
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c7adb9d7
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c7adb9d7
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c7adb9d7
Branch: refs/heads/master
Commit: c7adb9d76ae4a667d74873690ab5b1a3fd3b1ad6
Parents: b4ec4eb
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sat Apr 12 19:17:57 2014 -0600
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Tue Jun 3 15:47:20 2014 -0700
----------------------------------------------------------------------
3rdparty/libprocess/Makefile.am | 2 +
3rdparty/libprocess/include/process/queue.hpp | 87 ++++++++++++++++++++++
3rdparty/libprocess/src/tests/queue_tests.cpp | 78 +++++++++++++++++++
3 files changed, 167 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/c7adb9d7/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index b687068..87cf1ae 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -108,6 +108,7 @@ libprocess_la_SOURCES += \
$(top_srcdir)/include/process/process.hpp \
$(top_srcdir)/include/process/profiler.hpp \
$(top_srcdir)/include/process/protobuf.hpp \
+ $(top_srcdir)/include/process/queue.hpp \
$(top_srcdir)/include/process/reap.hpp \
$(top_srcdir)/include/process/run.hpp \
$(top_srcdir)/include/process/sequence.hpp \
@@ -142,6 +143,7 @@ tests_SOURCES = \
src/tests/metrics_tests.cpp \
src/tests/owned_tests.cpp \
src/tests/process_tests.cpp \
+ src/tests/queue_tests.cpp \
src/tests/reap_tests.cpp \
src/tests/sequence_tests.cpp \
src/tests/shared_tests.cpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/c7adb9d7/3rdparty/libprocess/include/process/queue.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/queue.hpp b/3rdparty/libprocess/include/process/queue.hpp
new file mode 100644
index 0000000..548c007
--- /dev/null
+++ b/3rdparty/libprocess/include/process/queue.hpp
@@ -0,0 +1,87 @@
+#ifndef __PROCESS_QUEUE_HPP__
+#define __PROCESS_QUEUE_HPP__
+
+#include <deque>
+#include <queue>
+
+#include <process/future.hpp>
+#include <process/internal.hpp>
+#include <process/owned.hpp>
+
+#include <stout/memory.hpp>
+
+namespace process {
+
+template <typename T>
+class Queue
+{
+public:
+ Queue() : data(new Data()) {}
+
+ void put(const T& t)
+ {
+ Owned<Promise<T> > promise;
+
+ internal::acquire(&data->lock);
+ {
+ if (data->promises.empty()) {
+ data->elements.push(t);
+ } else {
+ promise = data->promises.front();
+ data->promises.pop_front();
+ }
+ }
+ internal::release(&data->lock);
+
+ if (promise.get() != NULL) {
+ promise->set(t);
+ }
+ }
+
+ Future<T> get()
+ {
+ Future<T> future;
+
+ internal::acquire(&data->lock);
+ {
+ if (data->elements.empty()) {
+ data->promises.push_back(Owned<Promise<T> >(new Promise<T>()));
+ future = data->promises.back()->future();
+ } else {
+ future = Future<T>(data->elements.front());
+ data->elements.pop();
+ }
+ }
+ internal::release(&data->lock);
+
+ return future;
+ }
+
+private:
+ struct Data
+ {
+ Data() : lock(0) {}
+
+ ~Data()
+ {
+ // TODO(benh): Fail promises?
+ }
+
+ // Rather than use a process to serialize access to the queue's
+ // internal data we use a low-level "lock" which we acquire and
+ // release using atomic builtins.
+ int lock;
+
+ // Represents "waiters" for elements from the queue.
+ std::deque<Owned<Promise<T> > > promises;
+
+ // Represents elements already put in the queue.
+ std::queue<T> elements;
+ };
+
+ memory::shared_ptr<Data> data;
+};
+
+} // namespace process {
+
+#endif // __PROCESS_QUEUE_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/c7adb9d7/3rdparty/libprocess/src/tests/queue_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/queue_tests.cpp b/3rdparty/libprocess/src/tests/queue_tests.cpp
new file mode 100644
index 0000000..79741d3
--- /dev/null
+++ b/3rdparty/libprocess/src/tests/queue_tests.cpp
@@ -0,0 +1,78 @@
+#include <string>
+
+#include <gmock/gmock.h>
+
+#include <process/future.hpp>
+#include <process/queue.hpp>
+
+using namespace process;
+
+using std::string;
+
+TEST(Queue, block)
+{
+ Queue<string> q;
+
+ // A 'get' with an empty queue should block.
+ Future<string> get = q.get();
+
+ EXPECT_TRUE(get.isPending());
+
+ // After putting something the 'get' should be completed.
+ q.put("hello world");
+
+ EXPECT_TRUE(get.isReady());
+ EXPECT_EQ("hello world", get.get());
+}
+
+
+TEST(Queue, noblock)
+{
+ Queue<string> q;
+
+ // Doing a 'put' should cause a 'get' to be completed immediately.
+ q.put("world hello");
+
+ Future<string> get = q.get();
+
+ EXPECT_TRUE(get.isReady());
+ EXPECT_EQ("world hello", get.get());
+}
+
+
+TEST(Queue, queue)
+{
+ Queue<string> q;
+
+ // Multiple calls to 'get' should cause blocking until there have
+ // been enough corresponding calls to 'put'.
+ Future<string> get1 = q.get();
+ Future<string> get2 = q.get();
+ Future<string> get3 = q.get();
+
+ EXPECT_TRUE(get1.isPending());
+ EXPECT_TRUE(get2.isPending());
+ EXPECT_TRUE(get3.isPending());
+
+ q.put("hello");
+
+ EXPECT_TRUE(get1.isReady());
+ EXPECT_TRUE(get2.isPending());
+ EXPECT_TRUE(get3.isPending());
+
+ q.put("pretty");
+
+ EXPECT_TRUE(get1.isReady());
+ EXPECT_TRUE(get2.isReady());
+ EXPECT_TRUE(get3.isPending());
+
+ q.put("world");
+
+ EXPECT_TRUE(get1.isReady());
+ EXPECT_TRUE(get2.isReady());
+ EXPECT_TRUE(get3.isReady());
+
+ EXPECT_EQ("hello", get1.get());
+ EXPECT_EQ("pretty", get2.get());
+ EXPECT_EQ("world", get3.get());
+}