You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/04/13 01:37:30 UTC
[1/5] git commit: Added log implementation for state storage.
Repository: mesos
Updated Branches:
refs/heads/master 847db5288 -> 48dd007c4
Added log implementation for state storage.
Review: https://reviews.apache.org/r/19007
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/48dd007c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/48dd007c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/48dd007c
Branch: refs/heads/master
Commit: 48dd007c4d5945e2ee9f75dd4e1164a0c66f5b79
Parents: df2248b
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sat Jan 25 12:57:05 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Apr 12 17:35:37 2014 -0600
----------------------------------------------------------------------
src/Makefile.am | 2 +
src/messages/state.proto | 23 ++
src/state/log.cpp | 531 +++++++++++++++++++++++++++++++++++++++++
src/state/log.hpp | 45 ++++
src/tests/state_tests.cpp | 159 +++++++++++-
5 files changed, 759 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/48dd007c/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 95f133d..560b4c7 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -351,10 +351,12 @@ noinst_LTLIBRARIES += libstate.la
libstate_la_SOURCES = \
state/in_memory.cpp \
state/leveldb.cpp \
+ state/log.cpp \
state/zookeeper.cpp
libstate_la_SOURCES += \
state/in_memory.hpp \
state/leveldb.hpp \
+ state/log.hpp \
state/protobuf.hpp \
state/state.hpp \
state/storage.hpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/48dd007c/src/messages/state.proto
----------------------------------------------------------------------
diff --git a/src/messages/state.proto b/src/messages/state.proto
index 7f7a8a5..59276e5 100644
--- a/src/messages/state.proto
+++ b/src/messages/state.proto
@@ -24,3 +24,26 @@ message Entry {
required bytes uuid = 2;
required bytes value = 3;
}
+
+
+// Describes an operation used in the log storage implementation.
+message Operation {
+ enum Type {
+ SNAPSHOT = 1;
+ EXPUNGE = 2;
+ }
+
+ // Describes a "snapshot" operation.
+ message Snapshot {
+ required Entry entry = 1;
+ }
+
+ // Describes an "expunge" operation.
+ message Expunge {
+ required string name = 1;
+ }
+
+ required Type type = 1;
+ optional Snapshot snapshot = 2;
+ optional Expunge expunge = 3;
+}
http://git-wip-us.apache.org/repos/asf/mesos/blob/48dd007c/src/state/log.cpp
----------------------------------------------------------------------
diff --git a/src/state/log.cpp b/src/state/log.cpp
new file mode 100644
index 0000000..fd8b28a
--- /dev/null
+++ b/src/state/log.cpp
@@ -0,0 +1,531 @@
+#include <google/protobuf/message.h>
+
+#include <google/protobuf/io/zero_copy_stream_impl.h> // For ArrayInputStream.
+
+#include <list>
+#include <set>
+#include <string>
+
+#include <process/defer.hpp>
+#include <process/dispatch.hpp>
+#include <process/future.hpp>
+#include <process/mutex.hpp>
+#include <process/process.hpp>
+
+#include <stout/foreach.hpp>
+#include <stout/lambda.hpp>
+#include <stout/hashmap.hpp>
+#include <stout/nothing.hpp>
+#include <stout/option.hpp>
+#include <stout/uuid.hpp>
+
+#include "log/log.hpp"
+
+#include "state/log.hpp"
+
+using namespace mesos::internal::log;
+using namespace process;
+
+// Note that we don't add 'using std::set' here because we need
+// 'std::' to disambiguate the 'set' member.
+using std::list;
+using std::string;
+
+namespace mesos {
+namespace internal {
+namespace state {
+
+// A storage implementation for State that uses the replicated
+// log. The log is made up of appended operations. Each state entry is
+// mapped to a log "snapshot".
+//
+// All operations are gated by 'start()' which makes sure that a
+// Log::Writer has been started and all positions in the log have been
+// read and cached in memory. All reads are performed by this cache
+// (for now). If the Log::Writer gets demoted (i.e., because another
+// writer started) then the current operation will return false
+// implying the operation was not atomic and subsequent operations
+// will re-'start()' which will again read all positions to make sure
+// operations are consistent.
+// TODO(benh): Log demotion does not necessarily imply a non-atomic
+// read/modify/write. An alternative strategy might be to retry after
+// restarting via 'start' (and holding on to the mutex so no other
+// operations are attempted).
+class LogStorageProcess : public Process<LogStorageProcess>
+{
+public:
+ LogStorageProcess(Log* log);
+
+ virtual ~LogStorageProcess();
+
+ // Storage implementation.
+ Future<Option<state::Entry> > get(const string& name);
+ Future<bool> set(const state::Entry& entry, const UUID& uuid);
+ Future<bool> expunge(const state::Entry& entry);
+ Future<std::set<string> > names();
+
+protected:
+ virtual void finalize();
+
+private:
+ Future<Nothing> start();
+ Future<Nothing> _start(const Option<Log::Position>& position);
+ Future<Nothing> __start(
+ const Log::Position& beginning,
+ const Log::Position& position);
+
+ // Helper for applying log entries.
+ Future<Nothing> apply(const list<Log::Entry>& entries);
+
+ // Helper for performing truncation.
+ void truncate();
+ Future<Nothing> _truncate();
+ Future<Nothing> __truncate(
+ const Log::Position& minimum,
+ const Option<Log::Position>& position);
+
+ // Continuations.
+ Future<Option<state::Entry> > _get(const string& name);
+
+ Future<bool> _set(const state::Entry& entry, const UUID& uuid);
+ Future<bool> __set(const state::Entry& entry, const UUID& uuid);
+ Future<bool> ___set(
+ const state::Entry& entry,
+ const Option<Log::Position>& position);
+
+ Future<bool> _expunge(const state::Entry& entry);
+ Future<bool> __expunge(const state::Entry& entry);
+ Future<bool> ___expunge(
+ const state::Entry& entry,
+ const Option<Log::Position>& position);
+
+ Future<std::set<string> > _names();
+
+ Log::Reader reader;
+ Log::Writer writer;
+
+ // Used to serialize Log::Writer::append/truncate operations.
+ Mutex mutex;
+
+ // Whether or not we've started the ability to append to log.
+ Option<Future<Nothing> > starting;
+
+ // Last position in the log that we've read or written.
+ Option<Log::Position> index;
+
+ // Last position in the log up to which we've truncated.
+ Option<Log::Position> truncated;
+
+ // Note that while it would be nice to just use Operation::Snapshot
+ // modified to include a required field called 'position' we don't
+ // know the position (nor can we determine it) before we've done the
+ // actual appending of the data.
+ struct Snapshot
+ {
+ Snapshot(const Log::Position& position, const state::Entry& entry)
+ : position(position), entry(entry) {}
+
+ const Log::Position position;
+
+ // TODO(benh): Rather than storing the entire state::Entry we
+ // should just store the position, name, and UUID and cache the
+ // data so we don't use too much memory.
+ const state::Entry entry;
+ };
+
+ // All known snapshots indexed by name. Note that 'hashmap::get'
+ // must be used instead of 'operator []' since Snapshot doesn't have
+ // a default/empty constructor.
+ hashmap<string, Snapshot> snapshots;
+};
+
+
+LogStorageProcess::LogStorageProcess(Log* log)
+ : reader(log),
+ writer(log) {}
+
+
+LogStorageProcess::~LogStorageProcess() {}
+
+
+void LogStorageProcess::finalize()
+{
+ if (starting.isSome()) {
+ Future<Nothing>(starting.get()).discard();
+ }
+}
+
+
+Future<Nothing> LogStorageProcess::start()
+{
+ if (starting.isSome()) {
+ return starting.get();
+ }
+
+ starting = writer.start()
+ .then(defer(self(), &Self::_start, lambda::_1));
+
+ return starting.get();
+}
+
+
+Future<Nothing> LogStorageProcess::_start(
+ const Option<Log::Position>& position)
+{
+ CHECK_SOME(starting);
+
+ if (position.isNone()) {
+ starting = None(); // Reset 'starting' so we try again.
+ return start(); // TODO(benh): Don't try again forever?
+ }
+
+ // Now read and apply log entries. Since 'start' can be called
+ // multiple times (i.e., since we reset 'starting' after getting a
+ // None position returned after 'set', 'expunge', etc) we need to
+ // check and see if we've already successfully read the log at least
+ // once by checking 'index'. If we haven't yet read the log (i.e.,
+ // this is the first call to 'start' and 'index' is None) then we
+ // get the beginning of the log first so we can read from that up to
+ // what ever position was known at the time we started the
+ // writer. Note that it should always be safe to read a truncated
+ // entry since a subsequent operation in the log should invalidate
+ // that entry when we read it instead.
+ if (index.isSome()) {
+ // If we've started before (i.e., have an 'index' position) we
+ // should also expect know the last 'truncated' position.
+ CHECK_SOME(truncated);
+ return reader.read(index.get(), position.get())
+ .then(defer(self(), &Self::apply, lambda::_1));
+ }
+
+ return reader.beginning()
+ .then(defer(self(), &Self::__start, lambda::_1, position.get()));
+}
+
+
+Future<Nothing> LogStorageProcess::__start(
+ const Log::Position& beginning,
+ const Log::Position& position)
+{
+ CHECK_SOME(starting);
+
+ truncated = beginning; // Cache for future truncations.
+
+ return reader.read(beginning, position)
+ .then(defer(self(), &Self::apply, lambda::_1));
+}
+
+
+Future<Nothing> LogStorageProcess::apply(const list<Log::Entry>& entries)
+{
+ // Only read and apply entries past our index.
+ foreach (const Log::Entry& entry, entries) {
+ if (index.isNone() || index.get() < entry.position) {
+ // Parse the Operation from the Log::Entry.
+ Operation operation;
+
+ google::protobuf::io::ArrayInputStream stream(
+ entry.data.data(),
+ entry.data.size());
+
+ if (!operation.ParseFromZeroCopyStream(&stream)) {
+ return Failure("Failed to deserialize Operation");
+ }
+
+ switch (operation.type()) {
+ case Operation::SNAPSHOT: {
+ CHECK(operation.has_snapshot());
+
+ // Add or update the snapshot.
+ Snapshot snapshot(entry.position, operation.snapshot().entry());
+ snapshots.put(snapshot.entry.name(), snapshot);
+ break;
+ }
+
+ case Operation::EXPUNGE: {
+ CHECK(operation.has_expunge());
+ snapshots.erase(operation.expunge().name());
+ break;
+ }
+
+ default:
+ return Failure("Unknown operation: " + stringify(operation.type()));
+ }
+
+ index = entry.position;
+ }
+ }
+
+ return Nothing();
+}
+
+
+// TODO(benh): Truncation could be optimized by saving the "oldest"
+// snapshot and only doing a truncation if/when we update that
+// snapshot.
+// TODO(benh): Truncation is not enough to keep the log size small as
+// the log could get very fragmented. We'll need a way to defragment
+// the log as some state entries might not get set over a long period
+// of time and their associated snapshots are causing the log to grow
+// very big.
+void LogStorageProcess::truncate()
+{
+ // We lock the truncation since it includes a call to
+ // Log::Writer::truncate which must be serialized with calls to
+ // Log::Writer::append.
+ mutex.lock()
+ .then(defer(self(), &Self::_truncate))
+ .onAny(lambda::bind(&Mutex::unlock, mutex));
+}
+
+
+Future<Nothing> LogStorageProcess::_truncate()
+{
+ // Determine the minimum necessary position for all the snapshots.
+ Option<Log::Position> minimum = None();
+
+ foreachvalue (const Snapshot& snapshot, snapshots) {
+ minimum = min(minimum, snapshot.position);
+ }
+
+ CHECK_SOME(truncated);
+
+ if (minimum.isSome() && minimum.get() > truncated.get()) {
+ return writer.truncate(minimum.get())
+ .then(defer(self(), &Self::__truncate, minimum.get(), lambda::_1));
+
+ // NOTE: Any failure from Log::Writer::truncate doesn't propagate
+ // since the expectation is any subsequent Log::Writer::append
+ // would cause a failure. Furthermore, if the failure was
+ // temporary any subsequent Log::Writer::truncate should rectify a
+ // "missing" truncation.
+ }
+
+ return Nothing();
+}
+
+
+Future<Nothing> LogStorageProcess::__truncate(
+ const Log::Position& minimum,
+ const Option<Log::Position>& position)
+{
+ // Don't bother retrying truncation if we're demoted, we'll
+ // just try again the next time 'truncate()' gets called
+ // (after we've done what's necessary to append again).
+ if (position.isSome()) {
+ truncated = max(truncated, minimum);
+ index = max(index, position);
+ }
+
+ return Nothing();
+}
+
+
+Future<Option<state::Entry> > LogStorageProcess::get(const string& name)
+{
+ return start()
+ .then(defer(self(), &Self::_get, name));
+}
+
+
+Future<Option<state::Entry> > LogStorageProcess::_get(const string& name)
+{
+ Option<Snapshot> snapshot = snapshots.get(name);
+
+ if (snapshot.isNone()) {
+ return None();
+ }
+
+ return snapshot.get().entry;
+}
+
+
+Future<bool> LogStorageProcess::set(
+ const state::Entry& entry,
+ const UUID& uuid)
+{
+ return mutex.lock()
+ .then(defer(self(), &Self::_set, entry, uuid))
+ .onAny(lambda::bind(&Mutex::unlock, mutex));
+}
+
+
+Future<bool> LogStorageProcess::_set(
+ const state::Entry& entry,
+ const UUID& uuid)
+{
+ return start()
+ .then(defer(self(), &Self::__set, entry, uuid));
+}
+
+
+Future<bool> LogStorageProcess::__set(
+ const state::Entry& entry,
+ const UUID& uuid)
+{
+ // Check the version first (if we've already got a snapshot).
+ Option<Snapshot> snapshot = snapshots.get(entry.name());
+
+ if (snapshot.isSome()) {
+ if (UUID::fromBytes(snapshot.get().entry.uuid()) != uuid) {
+ return false;
+ }
+ }
+
+ // Now serialize and append a snapshot operation.
+ Operation operation;
+ operation.set_type(Operation::SNAPSHOT);
+ operation.mutable_snapshot()->mutable_entry()->CopyFrom(entry);
+
+ string value;
+ if (!operation.SerializeToString(&value)) {
+ return Failure("Failed to serialize Operation");
+ }
+
+ return writer.append(value)
+ .then(defer(self(), &Self::___set, entry, lambda::_1));
+}
+
+
+Future<bool> LogStorageProcess::___set(
+ const state::Entry& entry,
+ const Option<Log::Position>& position)
+{
+ if (position.isNone()) {
+ starting = None(); // Reset 'starting' so we try again.
+ return false;
+ }
+
+ // Add (or update) the snapshot for this entry and truncate
+ // the log if possible.
+ CHECK(!snapshots.contains(entry.name()) ||
+ snapshots.get(entry.name()).get().position < position.get());
+
+ Snapshot snapshot(position.get(), entry);
+ snapshots.put(snapshot.entry.name(), snapshot);
+ truncate();
+
+ // Update index so we don't bother with this position again.
+ index = max(index, position);
+
+ return true;
+}
+
+
+Future<bool> LogStorageProcess::expunge(const state::Entry& entry)
+{
+ return mutex.lock()
+ .then(defer(self(), &Self::_expunge, entry))
+ .onAny(lambda::bind(&Mutex::unlock, mutex));
+}
+
+
+Future<bool> LogStorageProcess::_expunge(const state::Entry& entry)
+{
+ return start()
+ .then(defer(self(), &Self::__expunge, entry));
+}
+
+
+Future<bool> LogStorageProcess::__expunge(const state::Entry& entry)
+{
+ Option<Snapshot> snapshot = snapshots.get(entry.name());
+
+ if (snapshot.isNone()) {
+ return false;
+ }
+
+ // Check the version first.
+ if (UUID::fromBytes(snapshot.get().entry.uuid()) !=
+ UUID::fromBytes(entry.uuid())) {
+ return false;
+ }
+
+ // Now serialize and append an expunge operation.
+ Operation operation;
+ operation.set_type(Operation::EXPUNGE);
+ operation.mutable_expunge()->set_name(entry.name());
+
+ string value;
+ if (!operation.SerializeToString(&value)) {
+ return Failure("Failed to serialize Operation");
+ }
+
+ return writer.append(value)
+ .then(defer(self(), &Self::___expunge, entry, lambda::_1));
+}
+
+
+Future<bool> LogStorageProcess::___expunge(
+ const state::Entry& entry,
+ const Option<Log::Position>& position)
+{
+ if (position.isNone()) {
+ starting = None(); // Reset 'starting' so we try again.
+ return false;
+ }
+
+ // Remove from snapshots and truncate the log if possible.
+ CHECK(snapshots.contains(entry.name()));
+ snapshots.erase(entry.name());
+ truncate();
+
+ return true;
+}
+
+
+Future<std::set<string> > LogStorageProcess::names()
+{
+ return start()
+ .then(defer(self(), &Self::_names));
+}
+
+
+Future<std::set<string> > LogStorageProcess::_names()
+{
+ const hashset<string>& keys = snapshots.keys();
+ return std::set<string>(keys.begin(), keys.end());
+}
+
+
+LogStorage::LogStorage(Log* log)
+{
+ process = new LogStorageProcess(log);
+ spawn(process);
+}
+
+
+LogStorage::~LogStorage()
+{
+ terminate(process);
+ wait(process);
+ delete process;
+}
+
+
+Future<Option<state::Entry> > LogStorage::get(const string& name)
+{
+ return dispatch(process, &LogStorageProcess::get, name);
+}
+
+
+Future<bool> LogStorage::set(const state::Entry& entry, const UUID& uuid)
+{
+ return dispatch(process, &LogStorageProcess::set, entry, uuid);
+}
+
+
+Future<bool> LogStorage::expunge(const state::Entry& entry)
+{
+ return dispatch(process, &LogStorageProcess::expunge, entry);
+}
+
+
+Future<std::set<string> > LogStorage::names()
+{
+ return dispatch(process, &LogStorageProcess::names);
+}
+
+} // namespace state {
+} // namespace internal {
+} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/48dd007c/src/state/log.hpp
----------------------------------------------------------------------
diff --git a/src/state/log.hpp b/src/state/log.hpp
new file mode 100644
index 0000000..e25d1e5
--- /dev/null
+++ b/src/state/log.hpp
@@ -0,0 +1,45 @@
+#ifndef __STATE_LOG_HPP__
+#define __STATE_LOG_HPP__
+
+#include <set>
+#include <string>
+
+#include <process/future.hpp>
+
+#include <stout/option.hpp>
+#include <stout/uuid.hpp>
+
+#include "messages/state.hpp"
+
+#include "state/storage.hpp"
+
+namespace mesos {
+namespace internal {
+namespace state {
+
+// Forward declarations.
+class LogStorageProcess;
+
+
+class LogStorage : public Storage
+{
+public:
+ LogStorage(log::Log* log);
+
+ virtual ~LogStorage();
+
+ // Storage implementation.
+ virtual process::Future<Option<Entry> > get(const std::string& name);
+ virtual process::Future<bool> set(const Entry& entry, const UUID& uuid);
+ virtual process::Future<bool> expunge(const Entry& entry);
+ virtual process::Future<std::set<std::string> > names();
+
+private:
+ LogStorageProcess* process;
+};
+
+} // namespace state {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __STATE_LOG_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/48dd007c/src/tests/state_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/state_tests.cpp b/src/tests/state_tests.cpp
index 678b1c8..56feb27 100644
--- a/src/tests/state_tests.cpp
+++ b/src/tests/state_tests.cpp
@@ -26,6 +26,7 @@
#include <process/future.hpp>
#include <process/gtest.hpp>
#include <process/protobuf.hpp>
+#include <process/pid.hpp>
#include <stout/gtest.hpp>
#include <stout/option.hpp>
@@ -34,14 +35,20 @@
#include "common/type_utils.hpp"
+#include "log/log.hpp"
+#include "log/replica.hpp"
+#include "log/tool/initialize.hpp"
+
#include "master/registry.hpp"
#include "state/in_memory.hpp"
#include "state/leveldb.hpp"
+#include "state/log.hpp"
#include "state/protobuf.hpp"
#include "state/storage.hpp"
#include "state/zookeeper.hpp"
+#include "tests/utils.hpp"
#ifdef MESOS_HAS_JAVA
#include "tests/zookeeper.hpp"
#endif
@@ -63,6 +70,11 @@ using state::ZooKeeperStorage;
using state::protobuf::State;
using state::protobuf::Variable;
+using std::set;
+using std::string;
+
+using mesos::internal::tests::TemporaryDirectoryTest;
+
typedef mesos::internal::Registry::Slaves Slaves;
typedef mesos::internal::Registry::Slave Slave;
@@ -400,7 +412,7 @@ protected:
State* state;
private:
- const std::string path;
+ const string path;
};
@@ -446,6 +458,151 @@ TEST_F(LevelDBStateTest, Names)
}
+class LogStateTest : public TemporaryDirectoryTest
+{
+public:
+ LogStateTest()
+ : storage(NULL),
+ state(NULL),
+ replica2(NULL),
+ log(NULL) {}
+
+protected:
+ virtual void SetUp()
+ {
+ TemporaryDirectoryTest::SetUp();
+
+ // For initializing the replicas.
+ log::tool::Initialize initializer;
+
+ string path1 = os::getcwd() + "/.log1";
+ string path2 = os::getcwd() + "/.log2";
+
+ initializer.flags.path = path1;
+ initializer.execute();
+
+ initializer.flags.path = path2;
+ initializer.execute();
+
+ // Only create the replica for 'path2' (i.e., the second replica)
+ // as the first replica will be created when we create a Log.
+ replica2 = new log::Replica(path2);
+
+ set<UPID> pids;
+ pids.insert(replica2->pid());
+
+ log = new log::Log(2, path1, pids);
+ storage = new state::LogStorage(log);
+ state = new State(storage);
+ }
+
+ virtual void TearDown()
+ {
+ delete state;
+ delete storage;
+ delete log;
+
+ delete replica2;
+
+ TemporaryDirectoryTest::TearDown();
+ }
+
+ state::Storage* storage;
+ State* state;
+
+ log::Replica* replica2;
+ log::Log* log;
+};
+
+
+TEST_F(LogStateTest, FetchAndStoreAndFetch)
+{
+ FetchAndStoreAndFetch(state);
+}
+
+
+TEST_F(LogStateTest, FetchAndStoreAndStoreAndFetch)
+{
+ FetchAndStoreAndStoreAndFetch(state);
+}
+
+
+TEST_F(LogStateTest, FetchAndStoreAndStoreFailAndFetch)
+{
+ FetchAndStoreAndStoreFailAndFetch(state);
+}
+
+
+TEST_F(LogStateTest, FetchAndStoreAndExpungeAndFetch)
+{
+ FetchAndStoreAndExpungeAndFetch(state);
+}
+
+
+TEST_F(LogStateTest, FetchAndStoreAndExpungeAndExpunge)
+{
+ FetchAndStoreAndExpungeAndExpunge(state);
+}
+
+
+TEST_F(LogStateTest, FetchAndStoreAndExpungeAndStoreAndFetch)
+{
+ FetchAndStoreAndExpungeAndStoreAndFetch(state);
+}
+
+
+TEST_F(LogStateTest, Names)
+{
+ Names(state);
+}
+
+
+Future<Option<Variable<Slaves> > > timeout(
+ Future<Option<Variable<Slaves> > > future)
+{
+ future.discard();
+ return Failure("Timeout");
+}
+
+
+TEST_F(LogStateTest, Timeout)
+{
+ Clock::pause();
+
+ Future<Variable<Slaves> > future1 = state->fetch<Slaves>("slaves");
+ AWAIT_READY(future1);
+
+ Variable<Slaves> variable = future1.get();
+
+ Slaves slaves1 = variable.get();
+ EXPECT_TRUE(slaves1.slaves().size() == 0);
+
+ Slave* slave = slaves1.add_slaves();
+ slave->mutable_info()->set_hostname("localhost");
+
+ variable = variable.mutate(slaves1);
+
+ // Now terminate the replica so the store will timeout.
+ terminate(replica2->pid());
+ wait(replica2->pid());
+
+ Future<Option<Variable<Slaves> > > future2 = state->store(variable);
+
+ Future<Option<Variable<Slaves> > > future3 =
+ future2.after(Seconds(5), lambda::bind(&timeout, lambda::_1));
+
+ ASSERT_TRUE(future2.isPending());
+ ASSERT_TRUE(future3.isPending());
+
+ Clock::advance(Seconds(5));
+
+ AWAIT_DISCARDED(future2);
+ AWAIT_FAILED(future3);
+
+ Clock::resume();
+}
+
+
#ifdef MESOS_HAS_JAVA
class ZooKeeperStateTest : public tests::ZooKeeperTest
{
[5/5] git commit: Used Future::after in Network.
Posted by be...@apache.org.
Used Future::after in Network.
Review: https://reviews.apache.org/r/19674
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/58241c03
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/58241c03
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/58241c03
Branch: refs/heads/master
Commit: 58241c03215749fbb7452cd2f8db6b5428055753
Parents: dcff80c
Author: Benjamin Hindman <be...@gmail.com>
Authored: Tue Mar 11 20:44:54 2014 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Apr 12 17:35:37 2014 -0600
----------------------------------------------------------------------
src/log/network.hpp | 13 +++++++++++--
1 file changed, 11 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/58241c03/src/log/network.hpp
----------------------------------------------------------------------
diff --git a/src/log/network.hpp b/src/log/network.hpp
index 488ed49..788d9eb 100644
--- a/src/log/network.hpp
+++ b/src/log/network.hpp
@@ -29,7 +29,6 @@
#include <process/collect.hpp>
#include <process/executor.hpp>
#include <process/protobuf.hpp>
-#include <process/timeout.hpp>
#include <stout/duration.hpp>
#include <stout/foreach.hpp>
@@ -118,6 +117,15 @@ public:
private:
typedef ZooKeeperNetwork This;
+ // Helper for handling time outs when collecting membership
+ // data. For now, a timeout is treated as a failure.
+ static process::Future<std::list<std::string> > timedout(
+ process::Future<std::list<std::string> > datas)
+ {
+ datas.discard();
+ return process::Failure("Timed out");
+ }
+
// Not copyable, not assignable.
ZooKeeperNetwork(const ZooKeeperNetwork&);
ZooKeeperNetwork& operator = (const ZooKeeperNetwork&);
@@ -411,7 +419,8 @@ inline void ZooKeeperNetwork::watched(
futures.push_back(group.data(membership));
}
- process::collect(futures, process::Timeout::in(Seconds(5)))
+ process::collect(futures)
+ .after(Seconds(5), lambda::bind(&This::timedout, lambda::_1))
.onAny(executor.defer(lambda::bind(&This::collected, this, lambda::_1)));
}
[4/5] git commit: Refactored State::names to return a set instead of
vector.
Posted by be...@apache.org.
Refactored State::names to return a set instead of vector.
Review: https://reviews.apache.org/r/19835
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/df2248bb
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/df2248bb
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/df2248bb
Branch: refs/heads/master
Commit: df2248bb660f145757a21f6c423df5eabf0694ee
Parents: bafce31
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sat Mar 29 13:57:43 2014 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Apr 12 17:35:37 2014 -0600
----------------------------------------------------------------------
.../org_apache_mesos_state_AbstractState.cpp | 18 ++++++++---------
src/state/in_memory.cpp | 11 +++++-----
src/state/in_memory.hpp | 4 ++--
src/state/leveldb.cpp | 15 +++++++-------
src/state/leveldb.hpp | 4 ++--
src/state/state.hpp | 6 +++---
src/state/storage.hpp | 4 ++--
src/state/zookeeper.cpp | 21 +++++++++++---------
src/state/zookeeper.hpp | 4 ++--
src/tests/state_tests.cpp | 8 +++++---
10 files changed, 51 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/df2248bb/src/java/jni/org_apache_mesos_state_AbstractState.cpp
----------------------------------------------------------------------
diff --git a/src/java/jni/org_apache_mesos_state_AbstractState.cpp b/src/java/jni/org_apache_mesos_state_AbstractState.cpp
index 476bb27..1accc8a 100644
--- a/src/java/jni/org_apache_mesos_state_AbstractState.cpp
+++ b/src/java/jni/org_apache_mesos_state_AbstractState.cpp
@@ -1,7 +1,7 @@
#include <jni.h>
+#include <set>
#include <string>
-#include <vector>
#include <process/check.hpp>
#include <process/future.hpp>
@@ -18,8 +18,8 @@ using namespace mesos::internal::state;
using process::Future;
+using std::set;
using std::string;
-using std::vector;
extern "C" {
@@ -614,8 +614,8 @@ JNIEXPORT jlong JNICALL Java_org_apache_mesos_state_AbstractState__1_1names
State* state = (State*) env->GetLongField(thiz, __state);
- Future<vector<string> >* future =
- new Future<vector<string> >(state->names());
+ Future<set<string> >* future =
+ new Future<set<string> >(state->names());
return (jlong) future;
}
@@ -629,7 +629,7 @@ JNIEXPORT jlong JNICALL Java_org_apache_mesos_state_AbstractState__1_1names
JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_AbstractState__1_1names_1cancel
(JNIEnv* env, jobject thiz, jlong jfuture)
{
- Future<vector<string> >* future = (Future<vector<string> >*) jfuture;
+ Future<set<string> >* future = (Future<set<string> >*) jfuture;
// We'll initiate a discard but we won't consider it cancelled since
// we don't know if/when the future will get discarded.
@@ -663,7 +663,7 @@ JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_AbstractState__1_1names_1
JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_AbstractState__1_1names_1is_1done
(JNIEnv* env, jobject thiz, jlong jfuture)
{
- Future<vector<string> >* future = (Future<vector<string> >*) jfuture;
+ Future<set<string> >* future = (Future<set<string> >*) jfuture;
return (jboolean) !future->isPending() || future->hasDiscard();
}
@@ -677,7 +677,7 @@ JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_AbstractState__1_1names_1
JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_AbstractState__1_1names_1get
(JNIEnv* env, jobject thiz, jlong jfuture)
{
- Future<vector<string> >* future = (Future<vector<string> >*) jfuture;
+ Future<set<string> >* future = (Future<set<string> >*) jfuture;
future->await();
@@ -725,7 +725,7 @@ JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_AbstractState__1_1names_1g
JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_AbstractState__1_1names_1get_1timeout
(JNIEnv* env, jobject thiz, jlong jfuture, jlong jtimeout, jobject junit)
{
- Future<vector<string> >* future = (Future<vector<string> >*) jfuture;
+ Future<set<string> >* future = (Future<set<string> >*) jfuture;
jclass clazz = env->GetObjectClass(junit);
@@ -787,7 +787,7 @@ JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_AbstractState__1_1names_1g
JNIEXPORT void JNICALL Java_org_apache_mesos_state_AbstractState__1_1names_1finalize
(JNIEnv* env, jobject thiz, jlong jfuture)
{
- Future<vector<string> >* future = (Future<vector<string> >*) jfuture;
+ Future<set<string> >* future = (Future<set<string> >*) jfuture;
delete future;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/df2248bb/src/state/in_memory.cpp
----------------------------------------------------------------------
diff --git a/src/state/in_memory.cpp b/src/state/in_memory.cpp
index b184565..ce04e47 100644
--- a/src/state/in_memory.cpp
+++ b/src/state/in_memory.cpp
@@ -1,5 +1,5 @@
+#include <set>
#include <string>
-#include <vector>
#include <process/dispatch.hpp>
#include <process/future.hpp>
@@ -16,8 +16,9 @@
using namespace process;
+// Note that we don't add 'using std::set' here because we need
+// 'std::' to disambiguate the 'set' member.
using std::string;
-using std::vector;
namespace mesos {
namespace internal {
@@ -60,10 +61,10 @@ public:
return true;
}
- vector<string> names()
+ std::set<string> names() // Use std:: to disambiguate 'set' member.
{
const hashset<string>& keys = entries.keys();
- return vector<string>(keys.begin(), keys.end());
+ return std::set<string>(keys.begin(), keys.end());
}
private:
@@ -105,7 +106,7 @@ Future<bool> InMemoryStorage::expunge(const Entry& entry)
}
-Future<vector<string> > InMemoryStorage::names()
+Future<std::set<string> > InMemoryStorage::names()
{
return dispatch(process, &InMemoryStorageProcess::names);
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/df2248bb/src/state/in_memory.hpp
----------------------------------------------------------------------
diff --git a/src/state/in_memory.hpp b/src/state/in_memory.hpp
index 822a89a..2040618 100644
--- a/src/state/in_memory.hpp
+++ b/src/state/in_memory.hpp
@@ -1,8 +1,8 @@
#ifndef __STATE_IN_MEMORY_HPP__
#define __STATE_IN_MEMORY_HPP__
+#include <set>
#include <string>
-#include <vector>
#include <process/future.hpp>
@@ -31,7 +31,7 @@ public:
virtual process::Future<Option<Entry> > get(const std::string& name);
virtual process::Future<bool> set(const Entry& entry, const UUID& uuid);
virtual process::Future<bool> expunge(const Entry& entry);
- virtual process::Future<std::vector<std::string> > names();
+ virtual process::Future<std::set<std::string> > names();
private:
InMemoryStorageProcess* process;
http://git-wip-us.apache.org/repos/asf/mesos/blob/df2248bb/src/state/leveldb.cpp
----------------------------------------------------------------------
diff --git a/src/state/leveldb.cpp b/src/state/leveldb.cpp
index 6db3ecf..4303df3 100644
--- a/src/state/leveldb.cpp
+++ b/src/state/leveldb.cpp
@@ -4,8 +4,8 @@
#include <google/protobuf/io/zero_copy_stream_impl.h> // For ArrayInputStream.
+#include <set>
#include <string>
-#include <vector>
#include <process/dispatch.hpp>
#include <process/future.hpp>
@@ -27,8 +27,9 @@
using namespace process;
+// Note that we don't add 'using std::set' here because we need
+// 'std::' to disambiguate the 'set' member.
using std::string;
-using std::vector;
namespace mesos {
namespace internal {
@@ -47,7 +48,7 @@ public:
Future<Option<Entry> > get(const string& name);
Future<bool> set(const Entry& entry, const UUID& uuid);
Future<bool> expunge(const Entry& entry);
- Future<vector<string> > names();
+ Future<std::set<string> > names();
private:
// Helpers for interacting with leveldb.
@@ -88,20 +89,20 @@ void LevelDBStorageProcess::initialize()
}
-Future<vector<string> > LevelDBStorageProcess::names()
+Future<std::set<string> > LevelDBStorageProcess::names()
{
if (error.isSome()) {
return Failure(error.get());
}
- vector<string> results;
+ std::set<string> results;
leveldb::Iterator* iterator = db->NewIterator(leveldb::ReadOptions());
iterator->SeekToFirst();
while (iterator->Valid()) {
- results.push_back(iterator->key().ToString());
+ results.insert(iterator->key().ToString());
iterator->Next();
}
@@ -287,7 +288,7 @@ Future<bool> LevelDBStorage::expunge(const Entry& entry)
}
-Future<vector<string> > LevelDBStorage::names()
+Future<std::set<string> > LevelDBStorage::names()
{
return dispatch(process, &LevelDBStorageProcess::names);
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/df2248bb/src/state/leveldb.hpp
----------------------------------------------------------------------
diff --git a/src/state/leveldb.hpp b/src/state/leveldb.hpp
index f2d6d27..53447c6 100644
--- a/src/state/leveldb.hpp
+++ b/src/state/leveldb.hpp
@@ -1,8 +1,8 @@
#ifndef __STATE_LEVELDB_HPP__
#define __STATE_LEVELDB_HPP__
+#include <set>
#include <string>
-#include <vector>
#include <process/future.hpp>
@@ -32,7 +32,7 @@ public:
virtual process::Future<Option<Entry> > get(const std::string& name);
virtual process::Future<bool> set(const Entry& entry, const UUID& uuid);
virtual process::Future<bool> expunge(const Entry& entry);
- virtual process::Future<std::vector<std::string> > names();
+ virtual process::Future<std::set<std::string> > names();
private:
LevelDBStorageProcess* process;
http://git-wip-us.apache.org/repos/asf/mesos/blob/df2248bb/src/state/state.hpp
----------------------------------------------------------------------
diff --git a/src/state/state.hpp b/src/state/state.hpp
index bebfe02..2c0bb20 100644
--- a/src/state/state.hpp
+++ b/src/state/state.hpp
@@ -19,8 +19,8 @@
#ifndef __STATE_STATE_HPP__
#define __STATE_STATE_HPP__
+#include <set>
#include <string>
-#include <vector>
#include <process/deferred.hpp> // TODO(benh): This is required by Clang.
#include <process/future.hpp>
@@ -111,7 +111,7 @@ public:
process::Future<bool> expunge(const Variable& variable);
// Returns the collection of variable names in the state.
- process::Future<std::vector<std::string> > names();
+ process::Future<std::set<std::string> > names();
private:
// Helpers to handle future results from fetch and swap. We make
@@ -189,7 +189,7 @@ inline process::Future<bool> State::expunge(const Variable& variable)
}
-inline process::Future<std::vector<std::string> > State::names()
+inline process::Future<std::set<std::string> > State::names()
{
return storage->names();
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/df2248bb/src/state/storage.hpp
----------------------------------------------------------------------
diff --git a/src/state/storage.hpp b/src/state/storage.hpp
index a137075..a36a93e 100644
--- a/src/state/storage.hpp
+++ b/src/state/storage.hpp
@@ -19,8 +19,8 @@
#ifndef __STATE_STORAGE_HPP__
#define __STATE_STORAGE_HPP__
+#include <set>
#include <string>
-#include <vector>
#include <process/future.hpp>
@@ -50,7 +50,7 @@ public:
virtual process::Future<bool> expunge(const Entry& entry) = 0;
// Returns the collection of variable names in the state.
- virtual process::Future<std::vector<std::string> > names() = 0;
+ virtual process::Future<std::set<std::string> > names() = 0;
};
} // namespace state {
http://git-wip-us.apache.org/repos/asf/mesos/blob/df2248bb/src/state/zookeeper.cpp
----------------------------------------------------------------------
diff --git a/src/state/zookeeper.cpp b/src/state/zookeeper.cpp
index dd632c0..a833eaa 100644
--- a/src/state/zookeeper.cpp
+++ b/src/state/zookeeper.cpp
@@ -3,6 +3,7 @@
#include <google/protobuf/io/zero_copy_stream_impl.h> // For ArrayInputStream.
#include <queue>
+#include <set>
#include <string>
#include <vector>
@@ -33,6 +34,8 @@
using namespace process;
+// Note that we don't add 'using std::set' here because we need
+// 'std::' to disambiguate the 'set' member.
using std::queue;
using std::string;
using std::vector;
@@ -60,7 +63,7 @@ public:
Future<Option<Entry> > get(const string& name);
Future<bool> set(const Entry& entry, const UUID& uuid);
virtual Future<bool> expunge(const Entry& entry);
- Future<vector<string> > names();
+ Future<std::set<string> > names();
// ZooKeeper events.
void connected(bool reconnect);
@@ -72,7 +75,7 @@ public:
private:
// Helpers for getting the names, fetching, and swapping.
- Result<vector<string> > doNames();
+ Result<std::set<string> > doNames();
Result<Option<Entry> > doGet(const string& name);
Result<bool> doSet(const Entry& entry, const UUID& uuid);
Result<bool> doExpunge(const Entry& entry);
@@ -99,7 +102,7 @@ private:
struct Names
{
- Promise<vector<string> > promise;
+ Promise<std::set<string> > promise;
};
struct Get
@@ -191,7 +194,7 @@ void ZooKeeperStorageProcess::initialize()
}
-Future<vector<string> > ZooKeeperStorageProcess::names()
+Future<std::set<string> > ZooKeeperStorageProcess::names()
{
if (error.isSome()) {
return Failure(error.get());
@@ -201,7 +204,7 @@ Future<vector<string> > ZooKeeperStorageProcess::names()
return names->promise.future();
}
- Result<vector<string> > result = doNames();
+ Result<std::set<string> > result = doNames();
if (result.isNone()) { // Try again later.
Names* names = new Names();
@@ -308,7 +311,7 @@ void ZooKeeperStorageProcess::connected(bool reconnect)
while (!pending.names.empty()) {
Names* names = pending.names.front();
- Result<vector<string> > result = doNames();
+ Result<std::set<string> > result = doNames();
if (result.isNone()) {
return; // Try again later.
} else if (result.isError()) {
@@ -385,7 +388,7 @@ void ZooKeeperStorageProcess::deleted(const string& path)
}
-Result<vector<string> > ZooKeeperStorageProcess::doNames()
+Result<std::set<string> > ZooKeeperStorageProcess::doNames()
{
// Get all children to determine current memberships.
vector<string> results;
@@ -404,7 +407,7 @@ Result<vector<string> > ZooKeeperStorageProcess::doNames()
// TODO(benh): It might make sense to "mangle" the names so that we
// can determine when a znode has incorrectly been added that
// actually doesn't store an Entry.
- return results;
+ return std::set<string>(results.begin(), results.end());
}
@@ -627,7 +630,7 @@ Future<bool> ZooKeeperStorage::expunge(const Entry& entry)
}
-Future<vector<string> > ZooKeeperStorage::names()
+Future<std::set<string> > ZooKeeperStorage::names()
{
return dispatch(process, &ZooKeeperStorageProcess::names);
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/df2248bb/src/state/zookeeper.hpp
----------------------------------------------------------------------
diff --git a/src/state/zookeeper.hpp b/src/state/zookeeper.hpp
index 420315b..1a8483d 100644
--- a/src/state/zookeeper.hpp
+++ b/src/state/zookeeper.hpp
@@ -1,8 +1,8 @@
#ifndef __STATE_ZOOKEEPER_HPP__
#define __STATE_ZOOKEEPER_HPP__
+#include <set>
#include <string>
-#include <vector>
#include <process/future.hpp>
@@ -39,7 +39,7 @@ public:
virtual process::Future<Option<Entry> > get(const std::string& name);
virtual process::Future<bool> set(const Entry& entry, const UUID& uuid);
virtual process::Future<bool> expunge(const Entry& entry);
- virtual process::Future<std::vector<std::string> > names();
+ virtual process::Future<std::set<std::string> > names();
private:
ZooKeeperStorageProcess* process;
http://git-wip-us.apache.org/repos/asf/mesos/blob/df2248bb/src/tests/state_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/state_tests.cpp b/src/tests/state_tests.cpp
index d0e0840..678b1c8 100644
--- a/src/tests/state_tests.cpp
+++ b/src/tests/state_tests.cpp
@@ -20,7 +20,6 @@
#include <set>
#include <string>
-#include <vector>
#include <mesos/mesos.hpp>
@@ -52,6 +51,9 @@ using namespace mesos::internal;
using namespace process;
+using std::set;
+using std::string;
+
using state::LevelDBStorage;
using state::Storage;
#ifdef MESOS_HAS_JAVA
@@ -297,10 +299,10 @@ void Names(State* state)
AWAIT_READY(future2);
ASSERT_SOME(future2.get());
- Future<std::vector<std::string> > names = state->names();
+ Future<set<string> > names = state->names();
AWAIT_READY(names);
ASSERT_TRUE(names.get().size() == 1);
- EXPECT_EQ("slaves", names.get()[0]);
+ EXPECT_NE(names.get().find("slaves"), names.get().end());
}
[3/5] git commit: Removed timeouts in interfaces in favor of
Future::after.
Posted by be...@apache.org.
Removed timeouts in interfaces in favor of Future::after.
Review: https://reviews.apache.org/r/19672
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/dcff80c9
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/dcff80c9
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/dcff80c9
Branch: refs/heads/master
Commit: dcff80c97aaac761e3a460e7dc9510940e666c0f
Parents: 847db52
Author: Benjamin Hindman <be...@gmail.com>
Authored: Tue Mar 11 20:44:27 2014 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Apr 12 17:35:37 2014 -0600
----------------------------------------------------------------------
3rdparty/libprocess/include/process/collect.hpp | 72 +++-----------------
1 file changed, 8 insertions(+), 64 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/dcff80c9/3rdparty/libprocess/include/process/collect.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/collect.hpp b/3rdparty/libprocess/include/process/collect.hpp
index f4d39b7..3bee8a6 100644
--- a/3rdparty/libprocess/include/process/collect.hpp
+++ b/3rdparty/libprocess/include/process/collect.hpp
@@ -1,21 +1,15 @@
#ifndef __PROCESS_COLLECT_HPP__
#define __PROCESS_COLLECT_HPP__
-#include <assert.h>
-
#include <list>
#include <process/check.hpp>
#include <process/defer.hpp>
-#include <process/delay.hpp>
#include <process/future.hpp>
#include <process/owned.hpp>
#include <process/process.hpp>
-#include <process/timeout.hpp>
#include <stout/lambda.hpp>
-#include <stout/none.hpp>
-#include <stout/option.hpp>
#include <stout/tuple.hpp>
// TODO(bmahler): Move these into a futures.hpp header to group Future
@@ -28,17 +22,13 @@ namespace process {
// the result will be a failure. Likewise, if any future fails then
// the result future will be a failure.
template <typename T>
-Future<std::list<T> > collect(
- const std::list<Future<T> >& futures,
- const Option<Timeout>& timeout = None());
+Future<std::list<T> > collect(const std::list<Future<T> >& futures);
// Waits on each future in the specified set and returns the list of
-// non-pending futures. On timeout, the result will be a failure.
+// non-pending futures.
template <typename T>
-Future<std::list<Future<T> > > await(
- const std::list<Future<T> >& futures,
- const Option<Timeout>& timeout = None());
+Future<std::list<Future<T> > > await(const std::list<Future<T> >& futures);
// Waits on each future specified and returns the wrapping future
@@ -57,10 +47,8 @@ class CollectProcess : public Process<CollectProcess<T> >
public:
CollectProcess(
const std::list<Future<T> >& _futures,
- const Option<Timeout>& _timeout,
Promise<std::list<T> >* _promise)
: futures(_futures),
- timeout(_timeout),
promise(_promise),
ready(0) {}
@@ -74,11 +62,6 @@ public:
// Stop this nonsense if nobody cares.
promise->future().onDiscard(defer(this, &CollectProcess::discarded));
- // Only wait as long as requested.
- if (timeout.isSome()) {
- delay(timeout.get().remaining(), this, &CollectProcess::timedout);
- }
-
typename std::list<Future<T> >::const_iterator iterator;
for (iterator = futures.begin(); iterator != futures.end(); ++iterator) {
(*iterator).onAny(defer(this, &CollectProcess::waited, lambda::_1));
@@ -92,20 +75,6 @@ private:
terminate(this);
}
- void timedout()
- {
- // Need to discard all of the futures so any of their associated
- // resources can get properly cleaned up.
- typename std::list<Future<T> >::const_iterator iterator;
- for (iterator = futures.begin(); iterator != futures.end(); ++iterator) {
- Future<T> future = *iterator; // Need a non-const copy to discard.
- future.discard();
- }
-
- promise->fail("Collect failed: timed out");
- terminate(this);
- }
-
void waited(const Future<T>& future)
{
if (future.isFailed()) {
@@ -129,7 +98,6 @@ private:
}
const std::list<Future<T> > futures;
- const Option<Timeout> timeout;
Promise<std::list<T> >* promise;
size_t ready;
};
@@ -141,10 +109,8 @@ class AwaitProcess : public Process<AwaitProcess<T> >
public:
AwaitProcess(
const std::list<Future<T> >& _futures,
- const Option<Timeout>& _timeout,
Promise<std::list<Future<T> > >* _promise)
: futures(_futures),
- timeout(_timeout),
promise(_promise),
ready(0) {}
@@ -158,11 +124,6 @@ public:
// Stop this nonsense if nobody cares.
promise->future().onDiscard(defer(this, &AwaitProcess::discarded));
- // Only wait as long as requested.
- if (timeout.isSome()) {
- delay(timeout.get().remaining(), this, &AwaitProcess::timedout);
- }
-
typename std::list<Future<T> >::const_iterator iterator;
for (iterator = futures.begin(); iterator != futures.end(); ++iterator) {
(*iterator).onAny(defer(this, &AwaitProcess::waited, lambda::_1));
@@ -176,23 +137,9 @@ private:
terminate(this);
}
- void timedout()
- {
- // Need to discard all of the futures so any of their associated
- // resources can get properly cleaned up.
- typename std::list<Future<T> >::const_iterator iterator;
- for (iterator = futures.begin(); iterator != futures.end(); ++iterator) {
- Future<T> future = *iterator; // Need a non-const copy to discard.
- future.discard();
- }
-
- promise->fail("Collect failed: timed out");
- terminate(this);
- }
-
void waited(const Future<T>& future)
{
- assert(!future.isPending());
+ CHECK(!future.isPending());
ready += 1;
if (ready == futures.size()) {
@@ -202,7 +149,6 @@ private:
}
const std::list<Future<T> > futures;
- const Option<Timeout> timeout;
Promise<std::list<Future<T> > >* promise;
size_t ready;
};
@@ -233,8 +179,7 @@ Future<tuples::tuple<Future<T1>, Future<T2> > > __await(
template <typename T>
inline Future<std::list<T> > collect(
- const std::list<Future<T> >& futures,
- const Option<Timeout>& timeout)
+ const std::list<Future<T> >& futures)
{
if (futures.empty()) {
return std::list<T>();
@@ -242,15 +187,14 @@ inline Future<std::list<T> > collect(
Promise<std::list<T> >* promise = new Promise<std::list<T> >();
Future<std::list<T> > future = promise->future();
- spawn(new internal::CollectProcess<T>(futures, timeout, promise), true);
+ spawn(new internal::CollectProcess<T>(futures, promise), true);
return future;
}
template <typename T>
inline Future<std::list<Future<T> > > await(
- const std::list<Future<T> >& futures,
- const Option<Timeout>& timeout)
+ const std::list<Future<T> >& futures)
{
if (futures.empty()) {
return futures;
@@ -259,7 +203,7 @@ inline Future<std::list<Future<T> > > await(
Promise<std::list<Future<T> > >* promise =
new Promise<std::list<Future<T> > >();
Future<std::list<Future<T> > > future = promise->future();
- spawn(new internal::AwaitProcess<T>(futures, timeout, promise), true);
+ spawn(new internal::AwaitProcess<T>(futures, promise), true);
return future;
}
[2/5] git commit: Cleaned up master detector usage in tests.
Posted by be...@apache.org.
Cleaned up master detector usage in tests.
Review: https://reviews.apache.org/r/19834
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/bafce319
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/bafce319
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/bafce319
Branch: refs/heads/master
Commit: bafce31921254459b0cf426f2d72637ec38a135f
Parents: 58241c0
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sat Mar 29 14:06:41 2014 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Apr 12 17:35:37 2014 -0600
----------------------------------------------------------------------
src/tests/cluster.hpp | 121 +++++++++++----------
src/tests/fault_tolerance_tests.cpp | 98 ++++++++---------
src/tests/master_contender_detector_tests.cpp | 5 +-
src/tests/master_tests.cpp | 17 ++-
src/tests/mesos.cpp | 6 +-
src/tests/mesos.hpp | 6 +-
6 files changed, 127 insertions(+), 126 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/bafce319/src/tests/cluster.hpp
----------------------------------------------------------------------
diff --git a/src/tests/cluster.hpp b/src/tests/cluster.hpp
index 11684d9..8479fe3 100644
--- a/src/tests/cluster.hpp
+++ b/src/tests/cluster.hpp
@@ -91,10 +91,9 @@ public:
// the launched master. If no allocator process is specified then
// the default allocator will be instantiated.
Try<process::PID<master::Master> > start(
- Option<master::allocator::AllocatorProcess*> allocatorProcess,
+ const Option<master::allocator::AllocatorProcess*>& allocatorProcess,
const master::Flags& flags = master::Flags());
-
// Stops and cleans up a master at the specified PID.
Try<Nothing> stop(const process::PID<master::Master>& pid);
@@ -110,9 +109,9 @@ public:
Option<zookeeper::URL> url;
// Encapsulates a single master's dependencies.
- struct MasterInfo
+ struct Master
{
- MasterInfo()
+ Master()
: master(NULL),
allocator(NULL),
allocatorProcess(NULL),
@@ -130,7 +129,7 @@ public:
MasterDetector* detector;
};
- std::map<process::PID<master::Master>, MasterInfo> masters;
+ std::map<process::PID<master::Master>, Master> masters;
};
// Abstracts the slaves of a cluster.
@@ -159,12 +158,12 @@ public:
// Detector. The detector is expected to outlive the launched
// slave (i.e., until it is stopped via Slaves::stop).
Try<process::PID<slave::Slave> > start(
- process::Owned<MasterDetector> detector,
+ const Option<MasterDetector*>& detector,
const slave::Flags& flags = slave::Flags());
Try<process::PID<slave::Slave> > start(
slave::Containerizer* containerizer,
- process::Owned<MasterDetector> detector,
+ const Option<MasterDetector*>& detector,
const slave::Flags& flags = slave::Flags());
// Stops and cleans up a slave at the specified PID. If 'shutdown'
@@ -239,7 +238,7 @@ inline Cluster::Masters::~Masters()
inline void Cluster::Masters::shutdown()
{
// TODO(benh): Use utils::copy from stout once namespaced.
- std::map<process::PID<master::Master>, MasterInfo> copy(masters);
+ std::map<process::PID<master::Master>, Master> copy(masters);
foreachkey (const process::PID<master::Master>& pid, copy) {
stop(pid);
}
@@ -255,7 +254,7 @@ inline Try<process::PID<master::Master> > Cluster::Masters::start(
inline Try<process::PID<master::Master> > Cluster::Masters::start(
- Option<master::allocator::AllocatorProcess*> allocatorProcess,
+ const Option<master::allocator::AllocatorProcess*>& allocatorProcess,
const master::Flags& flags)
{
// Disallow multiple masters when not using ZooKeeper.
@@ -263,16 +262,16 @@ inline Try<process::PID<master::Master> > Cluster::Masters::start(
return Error("Can not start multiple masters when not using ZooKeeper");
}
- MasterInfo masterInfo;
+ Master master;
if (allocatorProcess.isNone()) {
- masterInfo.allocatorProcess =
+ master.allocatorProcess =
new master::allocator::HierarchicalDRFAllocatorProcess();
- masterInfo.allocator =
- new master::allocator::Allocator(masterInfo.allocatorProcess);
+ master.allocator =
+ new master::allocator::Allocator(master.allocatorProcess);
} else {
- masterInfo.allocatorProcess = NULL;
- masterInfo.allocator =
+ master.allocatorProcess = NULL;
+ master.allocator =
new master::allocator::Allocator(allocatorProcess.get());
}
@@ -281,44 +280,44 @@ inline Try<process::PID<master::Master> > Cluster::Masters::start(
}
if (flags.registry == "in_memory") {
- masterInfo.storage = new state::InMemoryStorage();
+ master.storage = new state::InMemoryStorage();
} else {
return Error("'" + flags.registry + "' is not a supported"
" option for registry persistence");
}
- CHECK_NOTNULL(masterInfo.storage);
+ CHECK_NOTNULL(master.storage);
- masterInfo.state = new state::protobuf::State(masterInfo.storage);
- masterInfo.registrar = new master::Registrar(flags, masterInfo.state);
- masterInfo.repairer = new master::Repairer();
+ master.state = new state::protobuf::State(master.storage);
+ master.registrar = new master::Registrar(flags, master.state);
+ master.repairer = new master::Repairer();
if (url.isSome()) {
- masterInfo.contender = new ZooKeeperMasterContender(url.get());
- masterInfo.detector = new ZooKeeperMasterDetector(url.get());
+ master.contender = new ZooKeeperMasterContender(url.get());
+ master.detector = new ZooKeeperMasterDetector(url.get());
} else {
- masterInfo.contender = new StandaloneMasterContender();
- masterInfo.detector = new StandaloneMasterDetector();
+ master.contender = new StandaloneMasterContender();
+ master.detector = new StandaloneMasterDetector();
}
- masterInfo.master = new master::Master(
- masterInfo.allocator,
- masterInfo.registrar,
- masterInfo.repairer,
+ master.master = new master::Master(
+ master.allocator,
+ master.registrar,
+ master.repairer,
&cluster->files,
- masterInfo.contender,
- masterInfo.detector,
+ master.contender,
+ master.detector,
flags);
if (url.isNone()) {
// This means we are using the StandaloneMasterDetector.
- CHECK_NOTNULL(dynamic_cast<StandaloneMasterDetector*>(masterInfo.detector))
- ->appoint(masterInfo.master->info());
+ CHECK_NOTNULL(dynamic_cast<StandaloneMasterDetector*>(master.detector))
+ ->appoint(master.master->info());
}
- process::PID<master::Master> pid = process::spawn(masterInfo.master);
+ process::PID<master::Master> pid = process::spawn(master.master);
- masters[pid] = masterInfo;
+ masters[pid] = master;
return pid;
}
@@ -331,22 +330,22 @@ inline Try<Nothing> Cluster::Masters::stop(
return Error("No master found to stop");
}
- MasterInfo masterInfo = masters[pid];
+ Master master = masters[pid];
- process::terminate(masterInfo.master);
- process::wait(masterInfo.master);
- delete masterInfo.master;
+ process::terminate(master.master);
+ process::wait(master.master);
+ delete master.master;
- delete masterInfo.allocator; // Terminates and waits for allocator process.
- delete masterInfo.allocatorProcess; // May be NULL.
+ delete master.allocator; // Terminates and waits for allocator process.
+ delete master.allocatorProcess; // May be NULL.
- delete masterInfo.registrar;
- delete masterInfo.repairer;
- delete masterInfo.state;
- delete masterInfo.storage;
+ delete master.registrar;
+ delete master.repairer;
+ delete master.state;
+ delete master.storage;
- delete masterInfo.contender;
- delete masterInfo.detector;
+ delete master.contender;
+ delete master.detector;
masters.erase(pid);
@@ -422,12 +421,12 @@ inline Try<process::PID<slave::Slave> > Cluster::Slaves::start(
slave::Containerizer* containerizer,
const slave::Flags& flags)
{
- return start(containerizer, masters->detector(), flags);
+ return start(containerizer, None(), flags);
}
inline Try<process::PID<slave::Slave> > Cluster::Slaves::start(
- process::Owned<MasterDetector> detector,
+ const Option<MasterDetector*>& detector,
const slave::Flags& flags)
{
// TODO(benh): Create a work directory if using the default.
@@ -443,11 +442,17 @@ inline Try<process::PID<slave::Slave> > Cluster::Slaves::start(
slave.containerizer = containerizer.get();
- // Get a detector for the master(s).
- slave.detector = detector;
+ // Get a detector for the master(s) if one wasn't provided.
+ if (detector.isNone()) {
+ slave.detector = masters->detector();
+ }
slave.slave = new slave::Slave(
- flags, slave.detector.get(), slave.containerizer, &cluster->files);
+ flags,
+ detector.get(slave.detector.get()),
+ slave.containerizer,
+ &cluster->files);
+
process::PID<slave::Slave> pid = process::spawn(slave.slave);
slaves[pid] = slave;
@@ -458,7 +463,7 @@ inline Try<process::PID<slave::Slave> > Cluster::Slaves::start(
inline Try<process::PID<slave::Slave> > Cluster::Slaves::start(
slave::Containerizer* containerizer,
- process::Owned<MasterDetector> detector,
+ const Option<MasterDetector*>& detector,
const slave::Flags& flags)
{
// TODO(benh): Create a work directory if using the default.
@@ -467,11 +472,17 @@ inline Try<process::PID<slave::Slave> > Cluster::Slaves::start(
slave.flags = flags;
- // Get a detector for the master(s).
- slave.detector = detector;
+ // Get a detector for the master(s) if one wasn't provided.
+ if (detector.isNone()) {
+ slave.detector = masters->detector();
+ }
slave.slave = new slave::Slave(
- flags, slave.detector.get(), containerizer, &cluster->files);
+ flags,
+ detector.get(slave.detector.get()),
+ containerizer,
+ &cluster->files);
+
process::PID<slave::Slave> pid = process::spawn(slave.slave);
slaves[pid] = slave;
http://git-wip-us.apache.org/repos/asf/mesos/blob/bafce319/src/tests/fault_tolerance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fault_tolerance_tests.cpp b/src/tests/fault_tolerance_tests.cpp
index 99311c3..4796149 100644
--- a/src/tests/fault_tolerance_tests.cpp
+++ b/src/tests/fault_tolerance_tests.cpp
@@ -222,10 +222,9 @@ TEST_F(FaultToleranceTest, PartitionedSlaveReregistration)
MockExecutor exec(DEFAULT_EXECUTOR_ID);
- StandaloneMasterDetector* detector =
- new StandaloneMasterDetector(master.get());
+ StandaloneMasterDetector detector(master.get());
- Try<PID<Slave> > slave = StartSlave(&exec, Owned<MasterDetector>(detector));
+ Try<PID<Slave> > slave = StartSlave(&exec, &detector);
ASSERT_SOME(slave);
MockScheduler sched;
@@ -327,7 +326,7 @@ TEST_F(FaultToleranceTest, PartitionedSlaveReregistration)
// We now complete the partition on the slave side as well. This
// is done by simulating a master loss event which would normally
// occur during a network partition.
- detector->appoint(None());
+ detector.appoint(None());
Future<Nothing> shutdown;
EXPECT_CALL(exec, shutdown(_))
@@ -336,7 +335,7 @@ TEST_F(FaultToleranceTest, PartitionedSlaveReregistration)
shutdownMessage = FUTURE_PROTOBUF(ShutdownMessage(), _, slave.get());
// Have the slave re-register with the master.
- detector->appoint(master.get());
+ detector.appoint(master.get());
// Upon re-registration, the master will shutdown the slave.
// The slave will then shut down the executor.
@@ -609,9 +608,8 @@ TEST_F(FaultToleranceTest, MasterFailover)
ASSERT_SOME(master);
MockScheduler sched;
- StandaloneMasterDetector* detector =
- new StandaloneMasterDetector(master.get());
- TestingMesosSchedulerDriver driver(&sched, detector);
+ StandaloneMasterDetector detector(master.get());
+ TestingMesosSchedulerDriver driver(&sched, &detector);
Future<process::Message> frameworkRegisteredMessage =
FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()), _, _);
@@ -641,7 +639,7 @@ TEST_F(FaultToleranceTest, MasterFailover)
.WillOnce(FutureSatisfy(®istered2));
// Simulate a new master detected message to the scheduler.
- detector->appoint(master.get());
+ detector.appoint(master.get());
// Scheduler should retry authentication.
AWAIT_READY(authenticateMessage);
@@ -652,8 +650,6 @@ TEST_F(FaultToleranceTest, MasterFailover)
driver.stop();
driver.join();
- delete detector;
-
Shutdown();
}
@@ -681,12 +677,12 @@ TEST_F(FaultToleranceTest, ReregisterCompletedFrameworks)
ASSERT_SOME(master);
MockExecutor executor(DEFAULT_EXECUTOR_ID);
+
TestContainerizer containerizer(&executor);
- Owned<MasterDetector> slaveDetector(
- new StandaloneMasterDetector(master.get()));
+ StandaloneMasterDetector slaveDetector(master.get());
- Try<PID<Slave> > slave = StartSlave(&containerizer, slaveDetector);
+ Try<PID<Slave> > slave = StartSlave(&containerizer, &slaveDetector);
ASSERT_SOME(slave);
// Verify master/slave have 0 completed/running frameworks.
@@ -810,8 +806,8 @@ TEST_F(FaultToleranceTest, ReregisterCompletedFrameworks)
Future<SlaveReregisteredMessage> slaveReregisteredMessage =
FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
- dynamic_cast<StandaloneMasterDetector*>(slaveDetector.get())->appoint(
- master.get());
+ // Simulate a new master detected message to the slave.
+ slaveDetector.appoint(master.get());
AWAIT_READY(slaveReregisteredMessage);
@@ -1045,19 +1041,17 @@ TEST_F(FaultToleranceTest, FrameworkReregister)
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
- Owned<MasterDetector> slaveDetector(
- new StandaloneMasterDetector(master.get()));
- Try<PID<Slave> > slave = StartSlave(slaveDetector);
- ASSERT_SOME(slave);
+ StandaloneMasterDetector slaveDetector(master.get());
+ Try<PID<Slave> > slave = StartSlave(&slaveDetector);
+ ASSERT_SOME(slave);
// Create a detector for the scheduler driver because we want the
// spurious leading master change to be known by the scheduler
// driver only.
- Owned<MasterDetector> schedDetector(
- new StandaloneMasterDetector(master.get()));
+ StandaloneMasterDetector schedDetector(master.get());
MockScheduler sched;
- TestingMesosSchedulerDriver driver(&sched, schedDetector.get());
+ TestingMesosSchedulerDriver driver(&sched, &schedDetector);
Future<Nothing> registered;
EXPECT_CALL(sched, registered(&driver, _, _))
@@ -1093,8 +1087,7 @@ TEST_F(FaultToleranceTest, FrameworkReregister)
.Times(AtMost(1));
// Simulate a spurious leading master change at the scheduler.
- dynamic_cast<StandaloneMasterDetector*>(schedDetector.get())->appoint(
- master.get());
+ schedDetector.appoint(master.get());
AWAIT_READY(disconnected);
@@ -1119,9 +1112,8 @@ TEST_F(FaultToleranceTest, TaskLost)
ASSERT_SOME(slave);
MockScheduler sched;
- StandaloneMasterDetector* detector =
- new StandaloneMasterDetector(master.get());
- TestingMesosSchedulerDriver driver(&sched, detector);
+ StandaloneMasterDetector detector(master.get());
+ TestingMesosSchedulerDriver driver(&sched, &detector);
EXPECT_CALL(sched, registered(&driver, _, _));
@@ -1145,7 +1137,7 @@ TEST_F(FaultToleranceTest, TaskLost)
.WillOnce(FutureSatisfy(&disconnected));
// Simulate a spurious master loss event at the scheduler.
- detector->appoint(None());
+ detector.appoint(None());
AWAIT_READY(disconnected);
@@ -1301,15 +1293,14 @@ TEST_F(FaultToleranceTest, ReregisterFrameworkExitedExecutor)
MockExecutor exec(DEFAULT_EXECUTOR_ID);
TestContainerizer containerizer(&exec);
- Owned<MasterDetector> slaveDetector(
- new StandaloneMasterDetector(master.get()));
- Try<PID<Slave> > slave = StartSlave(&containerizer, slaveDetector);
+ StandaloneMasterDetector slaveDetector(master.get());
+
+ Try<PID<Slave> > slave = StartSlave(&containerizer, &slaveDetector);
ASSERT_SOME(slave);
MockScheduler sched;
- Owned<StandaloneMasterDetector> schedDetector(
- new StandaloneMasterDetector(master.get()));
- TestingMesosSchedulerDriver driver(&sched, schedDetector.get());
+ StandaloneMasterDetector schedDetector(master.get());
+ TestingMesosSchedulerDriver driver(&sched, &schedDetector);
Future<process::Message> frameworkRegisteredMessage =
FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()), _, _);
@@ -1361,8 +1352,7 @@ TEST_F(FaultToleranceTest, ReregisterFrameworkExitedExecutor)
Future<SlaveReregisteredMessage> slaveReregisteredMessage =
FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
- dynamic_cast<StandaloneMasterDetector*>(slaveDetector.get())->appoint(
- master.get());
+ slaveDetector.appoint(master.get());
// Wait for the slave to re-register.
AWAIT_READY(slaveReregisteredMessage);
@@ -1386,7 +1376,7 @@ TEST_F(FaultToleranceTest, ReregisterFrameworkExitedExecutor)
EXPECT_CALL(sched, registered(&driver, _, _));
- schedDetector->appoint(master.get());
+ schedDetector.appoint(master.get());
AWAIT_READY(frameworkRegisteredMessage2);
@@ -1822,8 +1812,9 @@ TEST_F(FaultToleranceTest, SlaveReregisterOnZKExpiration)
Future<SlaveRegisteredMessage> slaveRegisteredMessage =
FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
- StandaloneMasterDetector* detector = new StandaloneMasterDetector(master.get());
- Try<PID<Slave> > slave = StartSlave(Owned<MasterDetector>(detector));
+ StandaloneMasterDetector detector(master.get());
+
+ Try<PID<Slave> > slave = StartSlave(&detector);
ASSERT_SOME(slave);
AWAIT_READY(slaveRegisteredMessage);
@@ -1848,7 +1839,7 @@ TEST_F(FaultToleranceTest, SlaveReregisterOnZKExpiration)
// Simulate a spurious master change event (e.g., due to ZooKeeper
// expiration) at the slave.
- detector->appoint(master.get());
+ detector.appoint(master.get());
AWAIT_READY(slaveReregisteredMessage);
@@ -1872,10 +1863,9 @@ TEST_F(FaultToleranceTest, SlaveReregisterTerminatedExecutor)
MockExecutor exec(DEFAULT_EXECUTOR_ID);
TestContainerizer containerizer(&exec);
- StandaloneMasterDetector* detector =
- new StandaloneMasterDetector(master.get());
- Try<PID<Slave> > slave =
- StartSlave(&containerizer, Owned<MasterDetector>(detector));
+ StandaloneMasterDetector detector(master.get());
+
+ Try<PID<Slave> > slave = StartSlave(&containerizer, &detector);
ASSERT_SOME(slave);
MockScheduler sched;
@@ -1923,7 +1913,7 @@ TEST_F(FaultToleranceTest, SlaveReregisterTerminatedExecutor)
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&status2));
- detector->appoint(master.get());
+ detector.appoint(master.get());
AWAIT_READY(status2);
EXPECT_EQ(TASK_LOST, status2.get().state());
@@ -1943,9 +1933,9 @@ TEST_F(FaultToleranceTest, ReconcileLostTasks)
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
- StandaloneMasterDetector* detector =
- new StandaloneMasterDetector(master.get());
- Try<PID<Slave> > slave = StartSlave(Owned<MasterDetector>(detector));
+ StandaloneMasterDetector detector(master.get());
+
+ Try<PID<Slave> > slave = StartSlave(&detector);
ASSERT_SOME(slave);
MockScheduler sched;
@@ -1993,7 +1983,7 @@ TEST_F(FaultToleranceTest, ReconcileLostTasks)
// Simulate a spurious master change event (e.g., due to ZooKeeper
// expiration) at the slave to force re-registration.
- detector->appoint(master.get());
+ detector.appoint(master.get());
AWAIT_READY(slaveReregisteredMessage);
@@ -2019,9 +2009,9 @@ TEST_F(FaultToleranceTest, ReconcileIncompleteTasks)
MockExecutor exec(DEFAULT_EXECUTOR_ID);
- StandaloneMasterDetector* detector =
- new StandaloneMasterDetector(master.get());
- Try<PID<Slave> > slave = StartSlave(&exec, Owned<MasterDetector>(detector));
+ StandaloneMasterDetector detector(master.get());
+
+ Try<PID<Slave> > slave = StartSlave(&exec, &detector);
ASSERT_SOME(slave);
MockScheduler sched;
@@ -2078,7 +2068,7 @@ TEST_F(FaultToleranceTest, ReconcileIncompleteTasks)
// Simulate a spurious master change event (e.g., due to ZooKeeper
// expiration) at the slave to force re-registration.
- detector->appoint(master.get());
+ detector.appoint(master.get());
AWAIT_READY(slaveReregisteredMessage);
http://git-wip-us.apache.org/repos/asf/mesos/blob/bafce319/src/tests/master_contender_detector_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_contender_detector_tests.cpp b/src/tests/master_contender_detector_tests.cpp
index 8da7420..42051bf 100644
--- a/src/tests/master_contender_detector_tests.cpp
+++ b/src/tests/master_contender_detector_tests.cpp
@@ -102,7 +102,8 @@ TEST_F(MasterContenderDetectorTest, File)
ASSERT_SOME(detector);
- StartSlave(Owned<MasterDetector>(detector.get()), flags);
+ Try<PID<Slave> > slave = StartSlave(detector.get(), flags);
+ ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
@@ -123,6 +124,8 @@ TEST_F(MasterContenderDetectorTest, File)
driver.join();
Shutdown();
+
+ delete detector.get();
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/bafce319/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index 599f4a0..c047397 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -722,14 +722,13 @@ TEST_F(MasterTest, MasterInfoOnReElection)
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
- StandaloneMasterDetector* detector =
- new StandaloneMasterDetector(master.get());
+ StandaloneMasterDetector detector(master.get());
- Try<PID<Slave> > slave = StartSlave(Owned<MasterDetector>(detector));
+ Try<PID<Slave> > slave = StartSlave(&detector);
ASSERT_SOME(slave);
MockScheduler sched;
- TestingMesosSchedulerDriver driver(&sched, detector);
+ TestingMesosSchedulerDriver driver(&sched, &detector);
EXPECT_CALL(sched, registered(&driver, _, _))
.Times(1);
@@ -761,7 +760,7 @@ TEST_F(MasterTest, MasterInfoOnReElection)
// Simulate a spurious event (e.g., due to ZooKeeper
// expiration) at the scheduler.
- detector->appoint(master.get());
+ detector.appoint(master.get());
AWAIT_READY(disconnected);
@@ -841,15 +840,13 @@ TEST_F(MasterTest, MasterLost)
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
- Owned<StandaloneMasterDetector> detector(
- new StandaloneMasterDetector());
- detector->appoint(master.get());
+ StandaloneMasterDetector detector(master.get());
Try<PID<Slave> > slave = StartSlave();
ASSERT_SOME(slave);
MockScheduler sched;
- TestingMesosSchedulerDriver driver(&sched, detector.get());
+ TestingMesosSchedulerDriver driver(&sched, &detector);
EXPECT_CALL(sched, registered(&driver, _, _))
.Times(1);
@@ -869,7 +866,7 @@ TEST_F(MasterTest, MasterLost)
.WillOnce(FutureSatisfy(&disconnected));
// Simulate a spurious event at the scheduler.
- detector->appoint(None());
+ detector.appoint(None());
AWAIT_READY(disconnected);
http://git-wip-us.apache.org/repos/asf/mesos/blob/bafce319/src/tests/mesos.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.cpp b/src/tests/mesos.cpp
index ae3aeee..a9844e4 100644
--- a/src/tests/mesos.cpp
+++ b/src/tests/mesos.cpp
@@ -176,7 +176,7 @@ Try<process::PID<slave::Slave> > MesosTest::StartSlave(
Try<process::PID<slave::Slave> > MesosTest::StartSlave(
slave::Containerizer* containerizer,
- Owned<MasterDetector> detector,
+ MasterDetector* detector,
const Option<slave::Flags>& flags)
{
return cluster.slaves.start(
@@ -187,7 +187,7 @@ Try<process::PID<slave::Slave> > MesosTest::StartSlave(
Try<PID<slave::Slave> > MesosTest::StartSlave(
- Owned<MasterDetector> detector,
+ MasterDetector* detector,
const Option<slave::Flags>& flags)
{
return cluster.slaves.start(
@@ -197,7 +197,7 @@ Try<PID<slave::Slave> > MesosTest::StartSlave(
Try<PID<slave::Slave> > MesosTest::StartSlave(
MockExecutor* executor,
- Owned<MasterDetector> detector,
+ MasterDetector* detector,
const Option<slave::Flags>& flags)
{
slave::Containerizer* containerizer = new TestContainerizer(executor);
http://git-wip-us.apache.org/repos/asf/mesos/blob/bafce319/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index f77fbfe..7bc5e98 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -113,19 +113,19 @@ protected:
// Starts a slave with the specified containerizer, detector and flags.
virtual Try<process::PID<slave::Slave> > StartSlave(
slave::Containerizer* containerizer,
- process::Owned<MasterDetector> detector,
+ MasterDetector* detector,
const Option<slave::Flags>& flags = None());
// Starts a slave with the specified MasterDetector and flags.
virtual Try<process::PID<slave::Slave> > StartSlave(
- process::Owned<MasterDetector> detector,
+ MasterDetector* detector,
const Option<slave::Flags>& flags = None());
// Starts a slave with the specified mock executor, MasterDetector
// and flags.
virtual Try<process::PID<slave::Slave> > StartSlave(
MockExecutor* executor,
- process::Owned<MasterDetector> detector,
+ MasterDetector* detector,
const Option<slave::Flags>& flags = None());
// Stop the specified master.