You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/04/13 01:37:30 UTC

[1/5] git commit: Added log implementation for state storage.

Repository: mesos
Updated Branches:
  refs/heads/master 847db5288 -> 48dd007c4


Added log implementation for state storage.

Review: https://reviews.apache.org/r/19007


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/48dd007c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/48dd007c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/48dd007c

Branch: refs/heads/master
Commit: 48dd007c4d5945e2ee9f75dd4e1164a0c66f5b79
Parents: df2248b
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sat Jan 25 12:57:05 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Apr 12 17:35:37 2014 -0600

----------------------------------------------------------------------
 src/Makefile.am           |   2 +
 src/messages/state.proto  |  23 ++
 src/state/log.cpp         | 531 +++++++++++++++++++++++++++++++++++++++++
 src/state/log.hpp         |  45 ++++
 src/tests/state_tests.cpp | 159 +++++++++++-
 5 files changed, 759 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/48dd007c/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 95f133d..560b4c7 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -351,10 +351,12 @@ noinst_LTLIBRARIES += libstate.la
 libstate_la_SOURCES =							\
   state/in_memory.cpp							\
   state/leveldb.cpp							\
+  state/log.cpp								\
   state/zookeeper.cpp
 libstate_la_SOURCES +=							\
   state/in_memory.hpp							\
   state/leveldb.hpp							\
+  state/log.hpp								\
   state/protobuf.hpp							\
   state/state.hpp							\
   state/storage.hpp							\

http://git-wip-us.apache.org/repos/asf/mesos/blob/48dd007c/src/messages/state.proto
----------------------------------------------------------------------
diff --git a/src/messages/state.proto b/src/messages/state.proto
index 7f7a8a5..59276e5 100644
--- a/src/messages/state.proto
+++ b/src/messages/state.proto
@@ -24,3 +24,26 @@ message Entry {
   required bytes uuid = 2;
   required bytes value = 3;
 }
+
+
+// Describes an operation used in the log storage implementation.
+message Operation {
+  enum Type {
+    SNAPSHOT = 1;
+    EXPUNGE = 2;
+  }
+
+  // Describes a "snapshot" operation.
+  message Snapshot {
+    required Entry entry = 1;
+  }
+
+  // Describes an "expunge" operation.
+  message Expunge {
+    required string name = 1;
+  }
+
+  required Type type = 1;
+  optional Snapshot snapshot = 2;
+  optional Expunge expunge = 3;
+}

http://git-wip-us.apache.org/repos/asf/mesos/blob/48dd007c/src/state/log.cpp
----------------------------------------------------------------------
diff --git a/src/state/log.cpp b/src/state/log.cpp
new file mode 100644
index 0000000..fd8b28a
--- /dev/null
+++ b/src/state/log.cpp
@@ -0,0 +1,531 @@
+#include <google/protobuf/message.h>
+
+#include <google/protobuf/io/zero_copy_stream_impl.h> // For ArrayInputStream.
+
+#include <list>
+#include <set>
+#include <string>
+
+#include <process/defer.hpp>
+#include <process/dispatch.hpp>
+#include <process/future.hpp>
+#include <process/mutex.hpp>
+#include <process/process.hpp>
+
+#include <stout/foreach.hpp>
+#include <stout/lambda.hpp>
+#include <stout/hashmap.hpp>
+#include <stout/nothing.hpp>
+#include <stout/option.hpp>
+#include <stout/uuid.hpp>
+
+#include "log/log.hpp"
+
+#include "state/log.hpp"
+
+using namespace mesos::internal::log;
+using namespace process;
+
+// Note that we don't add 'using std::set' here because we need
+// 'std::' to disambiguate the 'set' member.
+using std::list;
+using std::string;
+
+namespace mesos {
+namespace internal {
+namespace state {
+
+// A storage implementation for State that uses the replicated
+// log. The log is made up of appended operations. Each state entry is
+// mapped to a log "snapshot".
+//
+// All operations are gated by 'start()' which makes sure that a
+// Log::Writer has been started and all positions in the log have been
+// read and cached in memory. All reads are performed by this cache
+// (for now). If the Log::Writer gets demoted (i.e., because another
+// writer started) then the current operation will return false
+// implying the operation was not atomic and subsequent operations
+// will re-'start()' which will again read all positions to make sure
+// operations are consistent.
+// TODO(benh): Log demotion does not necessarily imply a non-atomic
+// read/modify/write. An alternative strategy might be to retry after
+// restarting via 'start' (and holding on to the mutex so no other
+// operations are attempted).
+class LogStorageProcess : public Process<LogStorageProcess>
+{
+public:
+  LogStorageProcess(Log* log);
+
+  virtual ~LogStorageProcess();
+
+  // Storage implementation.
+  Future<Option<state::Entry> > get(const string& name);
+  Future<bool> set(const state::Entry& entry, const UUID& uuid);
+  Future<bool> expunge(const state::Entry& entry);
+  Future<std::set<string> > names();
+
+protected:
+  virtual void finalize();
+
+private:
+  Future<Nothing> start();
+  Future<Nothing> _start(const Option<Log::Position>& position);
+  Future<Nothing> __start(
+      const Log::Position& beginning,
+      const Log::Position& position);
+
+  // Helper for applying log entries.
+  Future<Nothing> apply(const list<Log::Entry>& entries);
+
+  // Helper for performing truncation.
+  void truncate();
+  Future<Nothing> _truncate();
+  Future<Nothing> __truncate(
+      const Log::Position& minimum,
+      const Option<Log::Position>& position);
+
+  // Continuations.
+  Future<Option<state::Entry> > _get(const string& name);
+
+  Future<bool> _set(const state::Entry& entry, const UUID& uuid);
+  Future<bool> __set(const state::Entry& entry, const UUID& uuid);
+  Future<bool> ___set(
+      const state::Entry& entry,
+      const Option<Log::Position>& position);
+
+  Future<bool> _expunge(const state::Entry& entry);
+  Future<bool> __expunge(const state::Entry& entry);
+  Future<bool> ___expunge(
+      const state::Entry& entry,
+      const Option<Log::Position>& position);
+
+  Future<std::set<string> > _names();
+
+  Log::Reader reader;
+  Log::Writer writer;
+
+  // Used to serialize Log::Writer::append/truncate operations.
+  Mutex mutex;
+
+  // Whether or not we've started the ability to append to log.
+  Option<Future<Nothing> > starting;
+
+  // Last position in the log that we've read or written.
+  Option<Log::Position> index;
+
+  // Last position in the log up to which we've truncated.
+  Option<Log::Position> truncated;
+
+  // Note that while it would be nice to just use Operation::Snapshot
+  // modified to include a required field called 'position' we don't
+  // know the position (nor can we determine it) before we've done the
+  // actual appending of the data.
+  struct Snapshot
+  {
+    Snapshot(const Log::Position& position, const state::Entry& entry)
+      : position(position), entry(entry) {}
+
+    const Log::Position position;
+
+    // TODO(benh): Rather than storing the entire state::Entry we
+    // should just store the position, name, and UUID and cache the
+    // data so we don't use too much memory.
+    const state::Entry entry;
+  };
+
+  // All known snapshots indexed by name. Note that 'hashmap::get'
+  // must be used instead of 'operator []' since Snapshot doesn't have
+  // a default/empty constructor.
+  hashmap<string, Snapshot> snapshots;
+};
+
+
+LogStorageProcess::LogStorageProcess(Log* log)
+  : reader(log),
+    writer(log) {}
+
+
+LogStorageProcess::~LogStorageProcess() {}
+
+
+void LogStorageProcess::finalize()
+{
+  if (starting.isSome()) {
+    Future<Nothing>(starting.get()).discard();
+  }
+}
+
+
+Future<Nothing> LogStorageProcess::start()
+{
+  if (starting.isSome()) {
+    return starting.get();
+  }
+
+  starting = writer.start()
+    .then(defer(self(), &Self::_start, lambda::_1));
+
+  return starting.get();
+}
+
+
+Future<Nothing> LogStorageProcess::_start(
+    const Option<Log::Position>& position)
+{
+  CHECK_SOME(starting);
+
+  if (position.isNone()) {
+    starting = None(); // Reset 'starting' so we try again.
+    return start(); // TODO(benh): Don't try again forever?
+  }
+
+  // Now read and apply log entries. Since 'start' can be called
+  // multiple times (i.e., since we reset 'starting' after getting a
+  // None position returned after 'set', 'expunge', etc) we need to
+  // check and see if we've already successfully read the log at least
+  // once by checking 'index'. If we haven't yet read the log (i.e.,
+  // this is the first call to 'start' and 'index' is None) then we
+  // get the beginning of the log first so we can read from that up to
+  // what ever position was known at the time we started the
+  // writer. Note that it should always be safe to read a truncated
+  // entry since a subsequent operation in the log should invalidate
+  // that entry when we read it instead.
+  if (index.isSome()) {
+    // If we've started before (i.e., have an 'index' position) we
+    // should also expect know the last 'truncated' position.
+    CHECK_SOME(truncated);
+    return reader.read(index.get(), position.get())
+      .then(defer(self(), &Self::apply, lambda::_1));
+  }
+
+  return reader.beginning()
+    .then(defer(self(), &Self::__start, lambda::_1, position.get()));
+}
+
+
+Future<Nothing> LogStorageProcess::__start(
+    const Log::Position& beginning,
+    const Log::Position& position)
+{
+  CHECK_SOME(starting);
+
+  truncated = beginning; // Cache for future truncations.
+
+  return reader.read(beginning, position)
+    .then(defer(self(), &Self::apply, lambda::_1));
+}
+
+
+Future<Nothing> LogStorageProcess::apply(const list<Log::Entry>& entries)
+{
+  // Only read and apply entries past our index.
+  foreach (const Log::Entry& entry, entries) {
+    if (index.isNone() || index.get() < entry.position) {
+      // Parse the Operation from the Log::Entry.
+      Operation operation;
+
+      google::protobuf::io::ArrayInputStream stream(
+          entry.data.data(),
+          entry.data.size());
+
+      if (!operation.ParseFromZeroCopyStream(&stream)) {
+        return Failure("Failed to deserialize Operation");
+      }
+
+      switch (operation.type()) {
+        case Operation::SNAPSHOT: {
+          CHECK(operation.has_snapshot());
+
+          // Add or update the snapshot.
+          Snapshot snapshot(entry.position, operation.snapshot().entry());
+          snapshots.put(snapshot.entry.name(), snapshot);
+          break;
+        }
+
+        case Operation::EXPUNGE: {
+          CHECK(operation.has_expunge());
+          snapshots.erase(operation.expunge().name());
+          break;
+        }
+
+        default:
+          return Failure("Unknown operation: " + stringify(operation.type()));
+      }
+
+      index = entry.position;
+    }
+  }
+
+  return Nothing();
+}
+
+
+// TODO(benh): Truncation could be optimized by saving the "oldest"
+// snapshot and only doing a truncation if/when we update that
+// snapshot.
+// TODO(benh): Truncation is not enough to keep the log size small as
+// the log could get very fragmented. We'll need a way to defragment
+// the log as some state entries might not get set over a long period
+// of time and their associated snapshots are causing the log to grow
+// very big.
+void LogStorageProcess::truncate()
+{
+  // We lock the truncation since it includes a call to
+  // Log::Writer::truncate which must be serialized with calls to
+  // Log::Writer::append.
+  mutex.lock()
+    .then(defer(self(), &Self::_truncate))
+    .onAny(lambda::bind(&Mutex::unlock, mutex));
+}
+
+
+Future<Nothing> LogStorageProcess::_truncate()
+{
+  // Determine the minimum necessary position for all the snapshots.
+  Option<Log::Position> minimum = None();
+
+  foreachvalue (const Snapshot& snapshot, snapshots) {
+    minimum = min(minimum, snapshot.position);
+  }
+
+  CHECK_SOME(truncated);
+
+  if (minimum.isSome() && minimum.get() > truncated.get()) {
+    return writer.truncate(minimum.get())
+      .then(defer(self(), &Self::__truncate, minimum.get(), lambda::_1));
+
+    // NOTE: Any failure from Log::Writer::truncate doesn't propagate
+    // since the expectation is any subsequent Log::Writer::append
+    // would cause a failure. Furthermore, if the failure was
+    // temporary any subsequent Log::Writer::truncate should rectify a
+    // "missing" truncation.
+  }
+
+  return Nothing();
+}
+
+
+Future<Nothing> LogStorageProcess::__truncate(
+    const Log::Position& minimum,
+    const Option<Log::Position>& position)
+{
+  // Don't bother retrying truncation if we're demoted, we'll
+  // just try again the next time 'truncate()' gets called
+  // (after we've done what's necessary to append again).
+  if (position.isSome()) {
+    truncated = max(truncated, minimum);
+    index = max(index, position);
+  }
+
+  return Nothing();
+}
+
+
+Future<Option<state::Entry> > LogStorageProcess::get(const string& name)
+{
+  return start()
+    .then(defer(self(), &Self::_get, name));
+}
+
+
+Future<Option<state::Entry> > LogStorageProcess::_get(const string& name)
+{
+  Option<Snapshot> snapshot = snapshots.get(name);
+
+  if (snapshot.isNone()) {
+    return None();
+  }
+
+  return snapshot.get().entry;
+}
+
+
+Future<bool> LogStorageProcess::set(
+    const state::Entry& entry,
+    const UUID& uuid)
+{
+  return mutex.lock()
+    .then(defer(self(), &Self::_set, entry, uuid))
+    .onAny(lambda::bind(&Mutex::unlock, mutex));
+}
+
+
+Future<bool> LogStorageProcess::_set(
+    const state::Entry& entry,
+    const UUID& uuid)
+{
+  return start()
+    .then(defer(self(), &Self::__set, entry, uuid));
+}
+
+
+Future<bool> LogStorageProcess::__set(
+    const state::Entry& entry,
+    const UUID& uuid)
+{
+  // Check the version first (if we've already got a snapshot).
+  Option<Snapshot> snapshot = snapshots.get(entry.name());
+
+  if (snapshot.isSome()) {
+    if (UUID::fromBytes(snapshot.get().entry.uuid()) != uuid) {
+      return false;
+    }
+  }
+
+  // Now serialize and append a snapshot operation.
+  Operation operation;
+  operation.set_type(Operation::SNAPSHOT);
+  operation.mutable_snapshot()->mutable_entry()->CopyFrom(entry);
+
+  string value;
+  if (!operation.SerializeToString(&value)) {
+    return Failure("Failed to serialize Operation");
+  }
+
+  return writer.append(value)
+    .then(defer(self(), &Self::___set, entry, lambda::_1));
+}
+
+
+Future<bool> LogStorageProcess::___set(
+    const state::Entry& entry,
+    const Option<Log::Position>& position)
+{
+  if (position.isNone()) {
+    starting = None(); // Reset 'starting' so we try again.
+    return false;
+  }
+
+  // Add (or update) the snapshot for this entry and truncate
+  // the log if possible.
+  CHECK(!snapshots.contains(entry.name()) ||
+        snapshots.get(entry.name()).get().position < position.get());
+
+  Snapshot snapshot(position.get(), entry);
+  snapshots.put(snapshot.entry.name(), snapshot);
+  truncate();
+
+  // Update index so we don't bother with this position again.
+  index = max(index, position);
+
+  return true;
+}
+
+
+Future<bool> LogStorageProcess::expunge(const state::Entry& entry)
+{
+  return mutex.lock()
+    .then(defer(self(), &Self::_expunge, entry))
+    .onAny(lambda::bind(&Mutex::unlock, mutex));
+}
+
+
+Future<bool> LogStorageProcess::_expunge(const state::Entry& entry)
+{
+  return start()
+    .then(defer(self(), &Self::__expunge, entry));
+}
+
+
+Future<bool> LogStorageProcess::__expunge(const state::Entry& entry)
+{
+  Option<Snapshot> snapshot = snapshots.get(entry.name());
+
+  if (snapshot.isNone()) {
+    return false;
+  }
+
+  // Check the version first.
+  if (UUID::fromBytes(snapshot.get().entry.uuid()) !=
+      UUID::fromBytes(entry.uuid())) {
+    return false;
+  }
+
+  // Now serialize and append an expunge operation.
+  Operation operation;
+  operation.set_type(Operation::EXPUNGE);
+  operation.mutable_expunge()->set_name(entry.name());
+
+  string value;
+  if (!operation.SerializeToString(&value)) {
+    return Failure("Failed to serialize Operation");
+  }
+
+  return writer.append(value)
+    .then(defer(self(), &Self::___expunge, entry, lambda::_1));
+}
+
+
+Future<bool> LogStorageProcess::___expunge(
+    const state::Entry& entry,
+    const Option<Log::Position>& position)
+{
+  if (position.isNone()) {
+    starting = None(); // Reset 'starting' so we try again.
+    return false;
+  }
+
+  // Remove from snapshots and truncate the log if possible.
+  CHECK(snapshots.contains(entry.name()));
+  snapshots.erase(entry.name());
+  truncate();
+
+  return true;
+}
+
+
+Future<std::set<string> > LogStorageProcess::names()
+{
+  return start()
+    .then(defer(self(), &Self::_names));
+}
+
+
+Future<std::set<string> > LogStorageProcess::_names()
+{
+  const hashset<string>& keys = snapshots.keys();
+  return std::set<string>(keys.begin(), keys.end());
+}
+
+
+LogStorage::LogStorage(Log* log)
+{
+  process = new LogStorageProcess(log);
+  spawn(process);
+}
+
+
+LogStorage::~LogStorage()
+{
+  terminate(process);
+  wait(process);
+  delete process;
+}
+
+
+Future<Option<state::Entry> > LogStorage::get(const string& name)
+{
+  return dispatch(process, &LogStorageProcess::get, name);
+}
+
+
+Future<bool> LogStorage::set(const state::Entry& entry, const UUID& uuid)
+{
+  return dispatch(process, &LogStorageProcess::set, entry, uuid);
+}
+
+
+Future<bool> LogStorage::expunge(const state::Entry& entry)
+{
+  return dispatch(process, &LogStorageProcess::expunge, entry);
+}
+
+
+Future<std::set<string> > LogStorage::names()
+{
+  return dispatch(process, &LogStorageProcess::names);
+}
+
+} // namespace state {
+} // namespace internal {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/48dd007c/src/state/log.hpp
----------------------------------------------------------------------
diff --git a/src/state/log.hpp b/src/state/log.hpp
new file mode 100644
index 0000000..e25d1e5
--- /dev/null
+++ b/src/state/log.hpp
@@ -0,0 +1,45 @@
+#ifndef __STATE_LOG_HPP__
+#define __STATE_LOG_HPP__
+
+#include <set>
+#include <string>
+
+#include <process/future.hpp>
+
+#include <stout/option.hpp>
+#include <stout/uuid.hpp>
+
+#include "messages/state.hpp"
+
+#include "state/storage.hpp"
+
+namespace mesos {
+namespace internal {
+namespace state {
+
+// Forward declarations.
+class LogStorageProcess;
+
+
+class LogStorage : public Storage
+{
+public:
+  LogStorage(log::Log* log);
+
+  virtual ~LogStorage();
+
+  // Storage implementation.
+  virtual process::Future<Option<Entry> > get(const std::string& name);
+  virtual process::Future<bool> set(const Entry& entry, const UUID& uuid);
+  virtual process::Future<bool> expunge(const Entry& entry);
+  virtual process::Future<std::set<std::string> > names();
+
+private:
+  LogStorageProcess* process;
+};
+
+} // namespace state {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __STATE_LOG_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/48dd007c/src/tests/state_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/state_tests.cpp b/src/tests/state_tests.cpp
index 678b1c8..56feb27 100644
--- a/src/tests/state_tests.cpp
+++ b/src/tests/state_tests.cpp
@@ -26,6 +26,7 @@
 #include <process/future.hpp>
 #include <process/gtest.hpp>
 #include <process/protobuf.hpp>
+#include <process/pid.hpp>
 
 #include <stout/gtest.hpp>
 #include <stout/option.hpp>
@@ -34,14 +35,20 @@
 
 #include "common/type_utils.hpp"
 
+#include "log/log.hpp"
+#include "log/replica.hpp"
+#include "log/tool/initialize.hpp"
+
 #include "master/registry.hpp"
 
 #include "state/in_memory.hpp"
 #include "state/leveldb.hpp"
+#include "state/log.hpp"
 #include "state/protobuf.hpp"
 #include "state/storage.hpp"
 #include "state/zookeeper.hpp"
 
+#include "tests/utils.hpp"
 #ifdef MESOS_HAS_JAVA
 #include "tests/zookeeper.hpp"
 #endif
@@ -63,6 +70,11 @@ using state::ZooKeeperStorage;
 using state::protobuf::State;
 using state::protobuf::Variable;
 
+using std::set;
+using std::string;
+
+using mesos::internal::tests::TemporaryDirectoryTest;
+
 typedef mesos::internal::Registry::Slaves Slaves;
 typedef mesos::internal::Registry::Slave Slave;
 
@@ -400,7 +412,7 @@ protected:
   State* state;
 
 private:
-  const std::string path;
+  const string path;
 };
 
 
@@ -446,6 +458,151 @@ TEST_F(LevelDBStateTest, Names)
 }
 
 
+class LogStateTest : public TemporaryDirectoryTest
+{
+public:
+  LogStateTest()
+    : storage(NULL),
+      state(NULL),
+      replica2(NULL),
+      log(NULL) {}
+
+protected:
+  virtual void SetUp()
+  {
+    TemporaryDirectoryTest::SetUp();
+
+    // For initializing the replicas.
+    log::tool::Initialize initializer;
+
+    string path1 = os::getcwd() + "/.log1";
+    string path2 = os::getcwd() + "/.log2";
+
+    initializer.flags.path = path1;
+    initializer.execute();
+
+    initializer.flags.path = path2;
+    initializer.execute();
+
+    // Only create the replica for 'path2' (i.e., the second replica)
+    // as the first replica will be created when we create a Log.
+    replica2 = new log::Replica(path2);
+
+    set<UPID> pids;
+    pids.insert(replica2->pid());
+
+    log = new log::Log(2, path1, pids);
+    storage = new state::LogStorage(log);
+    state = new State(storage);
+  }
+
+  virtual void TearDown()
+  {
+    delete state;
+    delete storage;
+    delete log;
+
+    delete replica2;
+
+    TemporaryDirectoryTest::TearDown();
+  }
+
+  state::Storage* storage;
+  State* state;
+
+  log::Replica* replica2;
+  log::Log* log;
+};
+
+
+TEST_F(LogStateTest, FetchAndStoreAndFetch)
+{
+  FetchAndStoreAndFetch(state);
+}
+
+
+TEST_F(LogStateTest, FetchAndStoreAndStoreAndFetch)
+{
+  FetchAndStoreAndStoreAndFetch(state);
+}
+
+
+TEST_F(LogStateTest, FetchAndStoreAndStoreFailAndFetch)
+{
+  FetchAndStoreAndStoreFailAndFetch(state);
+}
+
+
+TEST_F(LogStateTest, FetchAndStoreAndExpungeAndFetch)
+{
+  FetchAndStoreAndExpungeAndFetch(state);
+}
+
+
+TEST_F(LogStateTest, FetchAndStoreAndExpungeAndExpunge)
+{
+  FetchAndStoreAndExpungeAndExpunge(state);
+}
+
+
+TEST_F(LogStateTest, FetchAndStoreAndExpungeAndStoreAndFetch)
+{
+  FetchAndStoreAndExpungeAndStoreAndFetch(state);
+}
+
+
+TEST_F(LogStateTest, Names)
+{
+  Names(state);
+}
+
+
+Future<Option<Variable<Slaves> > > timeout(
+    Future<Option<Variable<Slaves> > > future)
+{
+  future.discard();
+  return Failure("Timeout");
+}
+
+
+TEST_F(LogStateTest, Timeout)
+{
+  Clock::pause();
+
+  Future<Variable<Slaves> > future1 = state->fetch<Slaves>("slaves");
+  AWAIT_READY(future1);
+
+  Variable<Slaves> variable = future1.get();
+
+  Slaves slaves1 = variable.get();
+  EXPECT_TRUE(slaves1.slaves().size() == 0);
+
+  Slave* slave = slaves1.add_slaves();
+  slave->mutable_info()->set_hostname("localhost");
+
+  variable = variable.mutate(slaves1);
+
+  // Now terminate the replica so the store will timeout.
+  terminate(replica2->pid());
+  wait(replica2->pid());
+
+  Future<Option<Variable<Slaves> > > future2 = state->store(variable);
+
+  Future<Option<Variable<Slaves> > > future3 =
+    future2.after(Seconds(5), lambda::bind(&timeout, lambda::_1));
+
+  ASSERT_TRUE(future2.isPending());
+  ASSERT_TRUE(future3.isPending());
+
+  Clock::advance(Seconds(5));
+
+  AWAIT_DISCARDED(future2);
+  AWAIT_FAILED(future3);
+
+  Clock::resume();
+}
+
+
 #ifdef MESOS_HAS_JAVA
 class ZooKeeperStateTest : public tests::ZooKeeperTest
 {


[5/5] git commit: Used Future::after in Network.

Posted by be...@apache.org.
Used Future::after in Network.

Review: https://reviews.apache.org/r/19674


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/58241c03
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/58241c03
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/58241c03

Branch: refs/heads/master
Commit: 58241c03215749fbb7452cd2f8db6b5428055753
Parents: dcff80c
Author: Benjamin Hindman <be...@gmail.com>
Authored: Tue Mar 11 20:44:54 2014 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Apr 12 17:35:37 2014 -0600

----------------------------------------------------------------------
 src/log/network.hpp | 13 +++++++++++--
 1 file changed, 11 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/58241c03/src/log/network.hpp
----------------------------------------------------------------------
diff --git a/src/log/network.hpp b/src/log/network.hpp
index 488ed49..788d9eb 100644
--- a/src/log/network.hpp
+++ b/src/log/network.hpp
@@ -29,7 +29,6 @@
 #include <process/collect.hpp>
 #include <process/executor.hpp>
 #include <process/protobuf.hpp>
-#include <process/timeout.hpp>
 
 #include <stout/duration.hpp>
 #include <stout/foreach.hpp>
@@ -118,6 +117,15 @@ public:
 private:
   typedef ZooKeeperNetwork This;
 
+  // Helper for handling time outs when collecting membership
+  // data. For now, a timeout is treated as a failure.
+  static process::Future<std::list<std::string> > timedout(
+      process::Future<std::list<std::string> > datas)
+  {
+    datas.discard();
+    return process::Failure("Timed out");
+  }
+
   // Not copyable, not assignable.
   ZooKeeperNetwork(const ZooKeeperNetwork&);
   ZooKeeperNetwork& operator = (const ZooKeeperNetwork&);
@@ -411,7 +419,8 @@ inline void ZooKeeperNetwork::watched(
     futures.push_back(group.data(membership));
   }
 
-  process::collect(futures, process::Timeout::in(Seconds(5)))
+  process::collect(futures)
+    .after(Seconds(5), lambda::bind(&This::timedout, lambda::_1))
     .onAny(executor.defer(lambda::bind(&This::collected, this, lambda::_1)));
 }
 


[4/5] git commit: Refactored State::names to return a set instead of vector.

Posted by be...@apache.org.
Refactored State::names to return a set instead of vector.

Review: https://reviews.apache.org/r/19835


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/df2248bb
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/df2248bb
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/df2248bb

Branch: refs/heads/master
Commit: df2248bb660f145757a21f6c423df5eabf0694ee
Parents: bafce31
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sat Mar 29 13:57:43 2014 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Apr 12 17:35:37 2014 -0600

----------------------------------------------------------------------
 .../org_apache_mesos_state_AbstractState.cpp    | 18 ++++++++---------
 src/state/in_memory.cpp                         | 11 +++++-----
 src/state/in_memory.hpp                         |  4 ++--
 src/state/leveldb.cpp                           | 15 +++++++-------
 src/state/leveldb.hpp                           |  4 ++--
 src/state/state.hpp                             |  6 +++---
 src/state/storage.hpp                           |  4 ++--
 src/state/zookeeper.cpp                         | 21 +++++++++++---------
 src/state/zookeeper.hpp                         |  4 ++--
 src/tests/state_tests.cpp                       |  8 +++++---
 10 files changed, 51 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/df2248bb/src/java/jni/org_apache_mesos_state_AbstractState.cpp
----------------------------------------------------------------------
diff --git a/src/java/jni/org_apache_mesos_state_AbstractState.cpp b/src/java/jni/org_apache_mesos_state_AbstractState.cpp
index 476bb27..1accc8a 100644
--- a/src/java/jni/org_apache_mesos_state_AbstractState.cpp
+++ b/src/java/jni/org_apache_mesos_state_AbstractState.cpp
@@ -1,7 +1,7 @@
 #include <jni.h>
 
+#include <set>
 #include <string>
-#include <vector>
 
 #include <process/check.hpp>
 #include <process/future.hpp>
@@ -18,8 +18,8 @@ using namespace mesos::internal::state;
 
 using process::Future;
 
+using std::set;
 using std::string;
-using std::vector;
 
 extern "C" {
 
@@ -614,8 +614,8 @@ JNIEXPORT jlong JNICALL Java_org_apache_mesos_state_AbstractState__1_1names
 
   State* state = (State*) env->GetLongField(thiz, __state);
 
-  Future<vector<string> >* future =
-    new Future<vector<string> >(state->names());
+  Future<set<string> >* future =
+    new Future<set<string> >(state->names());
 
   return (jlong) future;
 }
@@ -629,7 +629,7 @@ JNIEXPORT jlong JNICALL Java_org_apache_mesos_state_AbstractState__1_1names
 JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_AbstractState__1_1names_1cancel
   (JNIEnv* env, jobject thiz, jlong jfuture)
 {
-  Future<vector<string> >* future = (Future<vector<string> >*) jfuture;
+  Future<set<string> >* future = (Future<set<string> >*) jfuture;
 
   // We'll initiate a discard but we won't consider it cancelled since
   // we don't know if/when the future will get discarded.
@@ -663,7 +663,7 @@ JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_AbstractState__1_1names_1
 JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_AbstractState__1_1names_1is_1done
   (JNIEnv* env, jobject thiz, jlong jfuture)
 {
-  Future<vector<string> >* future = (Future<vector<string> >*) jfuture;
+  Future<set<string> >* future = (Future<set<string> >*) jfuture;
 
   return (jboolean) !future->isPending() || future->hasDiscard();
 }
@@ -677,7 +677,7 @@ JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_AbstractState__1_1names_1
 JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_AbstractState__1_1names_1get
   (JNIEnv* env, jobject thiz, jlong jfuture)
 {
-  Future<vector<string> >* future = (Future<vector<string> >*) jfuture;
+  Future<set<string> >* future = (Future<set<string> >*) jfuture;
 
   future->await();
 
@@ -725,7 +725,7 @@ JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_AbstractState__1_1names_1g
 JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_AbstractState__1_1names_1get_1timeout
   (JNIEnv* env, jobject thiz, jlong jfuture, jlong jtimeout, jobject junit)
 {
-  Future<vector<string> >* future = (Future<vector<string> >*) jfuture;
+  Future<set<string> >* future = (Future<set<string> >*) jfuture;
 
   jclass clazz = env->GetObjectClass(junit);
 
@@ -787,7 +787,7 @@ JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_AbstractState__1_1names_1g
 JNIEXPORT void JNICALL Java_org_apache_mesos_state_AbstractState__1_1names_1finalize
   (JNIEnv* env, jobject thiz, jlong jfuture)
 {
-  Future<vector<string> >* future = (Future<vector<string> >*) jfuture;
+  Future<set<string> >* future = (Future<set<string> >*) jfuture;
 
   delete future;
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/df2248bb/src/state/in_memory.cpp
----------------------------------------------------------------------
diff --git a/src/state/in_memory.cpp b/src/state/in_memory.cpp
index b184565..ce04e47 100644
--- a/src/state/in_memory.cpp
+++ b/src/state/in_memory.cpp
@@ -1,5 +1,5 @@
+#include <set>
 #include <string>
-#include <vector>
 
 #include <process/dispatch.hpp>
 #include <process/future.hpp>
@@ -16,8 +16,9 @@
 
 using namespace process;
 
+// Note that we don't add 'using std::set' here because we need
+// 'std::' to disambiguate the 'set' member.
 using std::string;
-using std::vector;
 
 namespace mesos {
 namespace internal {
@@ -60,10 +61,10 @@ public:
     return true;
   }
 
-  vector<string> names()
+  std::set<string> names() // Use std:: to disambiguate 'set' member.
   {
     const hashset<string>& keys = entries.keys();
-    return vector<string>(keys.begin(), keys.end());
+    return std::set<string>(keys.begin(), keys.end());
   }
 
 private:
@@ -105,7 +106,7 @@ Future<bool> InMemoryStorage::expunge(const Entry& entry)
 }
 
 
-Future<vector<string> > InMemoryStorage::names()
+Future<std::set<string> > InMemoryStorage::names()
 {
   return dispatch(process, &InMemoryStorageProcess::names);
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/df2248bb/src/state/in_memory.hpp
----------------------------------------------------------------------
diff --git a/src/state/in_memory.hpp b/src/state/in_memory.hpp
index 822a89a..2040618 100644
--- a/src/state/in_memory.hpp
+++ b/src/state/in_memory.hpp
@@ -1,8 +1,8 @@
 #ifndef __STATE_IN_MEMORY_HPP__
 #define __STATE_IN_MEMORY_HPP__
 
+#include <set>
 #include <string>
-#include <vector>
 
 #include <process/future.hpp>
 
@@ -31,7 +31,7 @@ public:
   virtual process::Future<Option<Entry> > get(const std::string& name);
   virtual process::Future<bool> set(const Entry& entry, const UUID& uuid);
   virtual process::Future<bool> expunge(const Entry& entry);
-  virtual process::Future<std::vector<std::string> > names();
+  virtual process::Future<std::set<std::string> > names();
 
 private:
   InMemoryStorageProcess* process;

http://git-wip-us.apache.org/repos/asf/mesos/blob/df2248bb/src/state/leveldb.cpp
----------------------------------------------------------------------
diff --git a/src/state/leveldb.cpp b/src/state/leveldb.cpp
index 6db3ecf..4303df3 100644
--- a/src/state/leveldb.cpp
+++ b/src/state/leveldb.cpp
@@ -4,8 +4,8 @@
 
 #include <google/protobuf/io/zero_copy_stream_impl.h> // For ArrayInputStream.
 
+#include <set>
 #include <string>
-#include <vector>
 
 #include <process/dispatch.hpp>
 #include <process/future.hpp>
@@ -27,8 +27,9 @@
 
 using namespace process;
 
+// Note that we don't add 'using std::set' here because we need
+// 'std::' to disambiguate the 'set' member.
 using std::string;
-using std::vector;
 
 namespace mesos {
 namespace internal {
@@ -47,7 +48,7 @@ public:
   Future<Option<Entry> > get(const string& name);
   Future<bool> set(const Entry& entry, const UUID& uuid);
   Future<bool> expunge(const Entry& entry);
-  Future<vector<string> > names();
+  Future<std::set<string> > names();
 
 private:
   // Helpers for interacting with leveldb.
@@ -88,20 +89,20 @@ void LevelDBStorageProcess::initialize()
 }
 
 
-Future<vector<string> > LevelDBStorageProcess::names()
+Future<std::set<string> > LevelDBStorageProcess::names()
 {
   if (error.isSome()) {
     return Failure(error.get());
   }
 
-  vector<string> results;
+  std::set<string> results;
 
   leveldb::Iterator* iterator = db->NewIterator(leveldb::ReadOptions());
 
   iterator->SeekToFirst();
 
   while (iterator->Valid()) {
-    results.push_back(iterator->key().ToString());
+    results.insert(iterator->key().ToString());
     iterator->Next();
   }
 
@@ -287,7 +288,7 @@ Future<bool> LevelDBStorage::expunge(const Entry& entry)
 }
 
 
-Future<vector<string> > LevelDBStorage::names()
+Future<std::set<string> > LevelDBStorage::names()
 {
   return dispatch(process, &LevelDBStorageProcess::names);
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/df2248bb/src/state/leveldb.hpp
----------------------------------------------------------------------
diff --git a/src/state/leveldb.hpp b/src/state/leveldb.hpp
index f2d6d27..53447c6 100644
--- a/src/state/leveldb.hpp
+++ b/src/state/leveldb.hpp
@@ -1,8 +1,8 @@
 #ifndef __STATE_LEVELDB_HPP__
 #define __STATE_LEVELDB_HPP__
 
+#include <set>
 #include <string>
-#include <vector>
 
 #include <process/future.hpp>
 
@@ -32,7 +32,7 @@ public:
   virtual process::Future<Option<Entry> > get(const std::string& name);
   virtual process::Future<bool> set(const Entry& entry, const UUID& uuid);
   virtual process::Future<bool> expunge(const Entry& entry);
-  virtual process::Future<std::vector<std::string> > names();
+  virtual process::Future<std::set<std::string> > names();
 
 private:
   LevelDBStorageProcess* process;

http://git-wip-us.apache.org/repos/asf/mesos/blob/df2248bb/src/state/state.hpp
----------------------------------------------------------------------
diff --git a/src/state/state.hpp b/src/state/state.hpp
index bebfe02..2c0bb20 100644
--- a/src/state/state.hpp
+++ b/src/state/state.hpp
@@ -19,8 +19,8 @@
 #ifndef __STATE_STATE_HPP__
 #define __STATE_STATE_HPP__
 
+#include <set>
 #include <string>
-#include <vector>
 
 #include <process/deferred.hpp> // TODO(benh): This is required by Clang.
 #include <process/future.hpp>
@@ -111,7 +111,7 @@ public:
   process::Future<bool> expunge(const Variable& variable);
 
   // Returns the collection of variable names in the state.
-  process::Future<std::vector<std::string> > names();
+  process::Future<std::set<std::string> > names();
 
 private:
   // Helpers to handle future results from fetch and swap. We make
@@ -189,7 +189,7 @@ inline process::Future<bool> State::expunge(const Variable& variable)
 }
 
 
-inline process::Future<std::vector<std::string> > State::names()
+inline process::Future<std::set<std::string> > State::names()
 {
   return storage->names();
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/df2248bb/src/state/storage.hpp
----------------------------------------------------------------------
diff --git a/src/state/storage.hpp b/src/state/storage.hpp
index a137075..a36a93e 100644
--- a/src/state/storage.hpp
+++ b/src/state/storage.hpp
@@ -19,8 +19,8 @@
 #ifndef __STATE_STORAGE_HPP__
 #define __STATE_STORAGE_HPP__
 
+#include <set>
 #include <string>
-#include <vector>
 
 #include <process/future.hpp>
 
@@ -50,7 +50,7 @@ public:
   virtual process::Future<bool> expunge(const Entry& entry) = 0;
 
   // Returns the collection of variable names in the state.
-  virtual process::Future<std::vector<std::string> > names() = 0;
+  virtual process::Future<std::set<std::string> > names() = 0;
 };
 
 } // namespace state {

http://git-wip-us.apache.org/repos/asf/mesos/blob/df2248bb/src/state/zookeeper.cpp
----------------------------------------------------------------------
diff --git a/src/state/zookeeper.cpp b/src/state/zookeeper.cpp
index dd632c0..a833eaa 100644
--- a/src/state/zookeeper.cpp
+++ b/src/state/zookeeper.cpp
@@ -3,6 +3,7 @@
 #include <google/protobuf/io/zero_copy_stream_impl.h> // For ArrayInputStream.
 
 #include <queue>
+#include <set>
 #include <string>
 #include <vector>
 
@@ -33,6 +34,8 @@
 
 using namespace process;
 
+// Note that we don't add 'using std::set' here because we need
+// 'std::' to disambiguate the 'set' member.
 using std::queue;
 using std::string;
 using std::vector;
@@ -60,7 +63,7 @@ public:
   Future<Option<Entry> > get(const string& name);
   Future<bool> set(const Entry& entry, const UUID& uuid);
   virtual Future<bool> expunge(const Entry& entry);
-  Future<vector<string> > names();
+  Future<std::set<string> > names();
 
   // ZooKeeper events.
   void connected(bool reconnect);
@@ -72,7 +75,7 @@ public:
 
 private:
   // Helpers for getting the names, fetching, and swapping.
-  Result<vector<string> > doNames();
+  Result<std::set<string> > doNames();
   Result<Option<Entry> > doGet(const string& name);
   Result<bool> doSet(const Entry& entry, const UUID& uuid);
   Result<bool> doExpunge(const Entry& entry);
@@ -99,7 +102,7 @@ private:
 
   struct Names
   {
-    Promise<vector<string> > promise;
+    Promise<std::set<string> > promise;
   };
 
   struct Get
@@ -191,7 +194,7 @@ void ZooKeeperStorageProcess::initialize()
 }
 
 
-Future<vector<string> > ZooKeeperStorageProcess::names()
+Future<std::set<string> > ZooKeeperStorageProcess::names()
 {
   if (error.isSome()) {
     return Failure(error.get());
@@ -201,7 +204,7 @@ Future<vector<string> > ZooKeeperStorageProcess::names()
     return names->promise.future();
   }
 
-  Result<vector<string> > result = doNames();
+  Result<std::set<string> > result = doNames();
 
   if (result.isNone()) { // Try again later.
     Names* names = new Names();
@@ -308,7 +311,7 @@ void ZooKeeperStorageProcess::connected(bool reconnect)
 
   while (!pending.names.empty()) {
     Names* names = pending.names.front();
-    Result<vector<string> > result = doNames();
+    Result<std::set<string> > result = doNames();
     if (result.isNone()) {
       return; // Try again later.
     } else if (result.isError()) {
@@ -385,7 +388,7 @@ void ZooKeeperStorageProcess::deleted(const string& path)
 }
 
 
-Result<vector<string> > ZooKeeperStorageProcess::doNames()
+Result<std::set<string> > ZooKeeperStorageProcess::doNames()
 {
   // Get all children to determine current memberships.
   vector<string> results;
@@ -404,7 +407,7 @@ Result<vector<string> > ZooKeeperStorageProcess::doNames()
   // TODO(benh): It might make sense to "mangle" the names so that we
   // can determine when a znode has incorrectly been added that
   // actually doesn't store an Entry.
-  return results;
+  return std::set<string>(results.begin(), results.end());
 }
 
 
@@ -627,7 +630,7 @@ Future<bool> ZooKeeperStorage::expunge(const Entry& entry)
 }
 
 
-Future<vector<string> > ZooKeeperStorage::names()
+Future<std::set<string> > ZooKeeperStorage::names()
 {
   return dispatch(process, &ZooKeeperStorageProcess::names);
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/df2248bb/src/state/zookeeper.hpp
----------------------------------------------------------------------
diff --git a/src/state/zookeeper.hpp b/src/state/zookeeper.hpp
index 420315b..1a8483d 100644
--- a/src/state/zookeeper.hpp
+++ b/src/state/zookeeper.hpp
@@ -1,8 +1,8 @@
 #ifndef __STATE_ZOOKEEPER_HPP__
 #define __STATE_ZOOKEEPER_HPP__
 
+#include <set>
 #include <string>
-#include <vector>
 
 #include <process/future.hpp>
 
@@ -39,7 +39,7 @@ public:
   virtual process::Future<Option<Entry> > get(const std::string& name);
   virtual process::Future<bool> set(const Entry& entry, const UUID& uuid);
   virtual process::Future<bool> expunge(const Entry& entry);
-  virtual process::Future<std::vector<std::string> > names();
+  virtual process::Future<std::set<std::string> > names();
 
 private:
   ZooKeeperStorageProcess* process;

http://git-wip-us.apache.org/repos/asf/mesos/blob/df2248bb/src/tests/state_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/state_tests.cpp b/src/tests/state_tests.cpp
index d0e0840..678b1c8 100644
--- a/src/tests/state_tests.cpp
+++ b/src/tests/state_tests.cpp
@@ -20,7 +20,6 @@
 
 #include <set>
 #include <string>
-#include <vector>
 
 #include <mesos/mesos.hpp>
 
@@ -52,6 +51,9 @@ using namespace mesos::internal;
 
 using namespace process;
 
+using std::set;
+using std::string;
+
 using state::LevelDBStorage;
 using state::Storage;
 #ifdef MESOS_HAS_JAVA
@@ -297,10 +299,10 @@ void Names(State* state)
   AWAIT_READY(future2);
   ASSERT_SOME(future2.get());
 
-  Future<std::vector<std::string> > names = state->names();
+  Future<set<string> > names = state->names();
   AWAIT_READY(names);
   ASSERT_TRUE(names.get().size() == 1);
-  EXPECT_EQ("slaves", names.get()[0]);
+  EXPECT_NE(names.get().find("slaves"), names.get().end());
 }
 
 


[3/5] git commit: Removed timeouts in interfaces in favor of Future::after.

Posted by be...@apache.org.
Removed timeouts in interfaces in favor of Future::after.

Review: https://reviews.apache.org/r/19672


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/dcff80c9
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/dcff80c9
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/dcff80c9

Branch: refs/heads/master
Commit: dcff80c97aaac761e3a460e7dc9510940e666c0f
Parents: 847db52
Author: Benjamin Hindman <be...@gmail.com>
Authored: Tue Mar 11 20:44:27 2014 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Apr 12 17:35:37 2014 -0600

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/collect.hpp | 72 +++-----------------
 1 file changed, 8 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/dcff80c9/3rdparty/libprocess/include/process/collect.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/collect.hpp b/3rdparty/libprocess/include/process/collect.hpp
index f4d39b7..3bee8a6 100644
--- a/3rdparty/libprocess/include/process/collect.hpp
+++ b/3rdparty/libprocess/include/process/collect.hpp
@@ -1,21 +1,15 @@
 #ifndef __PROCESS_COLLECT_HPP__
 #define __PROCESS_COLLECT_HPP__
 
-#include <assert.h>
-
 #include <list>
 
 #include <process/check.hpp>
 #include <process/defer.hpp>
-#include <process/delay.hpp>
 #include <process/future.hpp>
 #include <process/owned.hpp>
 #include <process/process.hpp>
-#include <process/timeout.hpp>
 
 #include <stout/lambda.hpp>
-#include <stout/none.hpp>
-#include <stout/option.hpp>
 #include <stout/tuple.hpp>
 
 // TODO(bmahler): Move these into a futures.hpp header to group Future
@@ -28,17 +22,13 @@ namespace process {
 // the result will be a failure. Likewise, if any future fails then
 // the result future will be a failure.
 template <typename T>
-Future<std::list<T> > collect(
-    const std::list<Future<T> >& futures,
-    const Option<Timeout>& timeout = None());
+Future<std::list<T> > collect(const std::list<Future<T> >& futures);
 
 
 // Waits on each future in the specified set and returns the list of
-// non-pending futures. On timeout, the result will be a failure.
+// non-pending futures.
 template <typename T>
-Future<std::list<Future<T> > > await(
-    const std::list<Future<T> >& futures,
-    const Option<Timeout>& timeout = None());
+Future<std::list<Future<T> > > await(const std::list<Future<T> >& futures);
 
 
 // Waits on each future specified and returns the wrapping future
@@ -57,10 +47,8 @@ class CollectProcess : public Process<CollectProcess<T> >
 public:
   CollectProcess(
       const std::list<Future<T> >& _futures,
-      const Option<Timeout>& _timeout,
       Promise<std::list<T> >* _promise)
     : futures(_futures),
-      timeout(_timeout),
       promise(_promise),
       ready(0) {}
 
@@ -74,11 +62,6 @@ public:
     // Stop this nonsense if nobody cares.
     promise->future().onDiscard(defer(this, &CollectProcess::discarded));
 
-    // Only wait as long as requested.
-    if (timeout.isSome()) {
-      delay(timeout.get().remaining(), this, &CollectProcess::timedout);
-    }
-
     typename std::list<Future<T> >::const_iterator iterator;
     for (iterator = futures.begin(); iterator != futures.end(); ++iterator) {
       (*iterator).onAny(defer(this, &CollectProcess::waited, lambda::_1));
@@ -92,20 +75,6 @@ private:
     terminate(this);
   }
 
-  void timedout()
-  {
-    // Need to discard all of the futures so any of their associated
-    // resources can get properly cleaned up.
-    typename std::list<Future<T> >::const_iterator iterator;
-    for (iterator = futures.begin(); iterator != futures.end(); ++iterator) {
-      Future<T> future = *iterator; // Need a non-const copy to discard.
-      future.discard();
-    }
-
-    promise->fail("Collect failed: timed out");
-    terminate(this);
-  }
-
   void waited(const Future<T>& future)
   {
     if (future.isFailed()) {
@@ -129,7 +98,6 @@ private:
   }
 
   const std::list<Future<T> > futures;
-  const Option<Timeout> timeout;
   Promise<std::list<T> >* promise;
   size_t ready;
 };
@@ -141,10 +109,8 @@ class AwaitProcess : public Process<AwaitProcess<T> >
 public:
   AwaitProcess(
       const std::list<Future<T> >& _futures,
-      const Option<Timeout>& _timeout,
       Promise<std::list<Future<T> > >* _promise)
     : futures(_futures),
-      timeout(_timeout),
       promise(_promise),
       ready(0) {}
 
@@ -158,11 +124,6 @@ public:
     // Stop this nonsense if nobody cares.
     promise->future().onDiscard(defer(this, &AwaitProcess::discarded));
 
-    // Only wait as long as requested.
-    if (timeout.isSome()) {
-      delay(timeout.get().remaining(), this, &AwaitProcess::timedout);
-    }
-
     typename std::list<Future<T> >::const_iterator iterator;
     for (iterator = futures.begin(); iterator != futures.end(); ++iterator) {
       (*iterator).onAny(defer(this, &AwaitProcess::waited, lambda::_1));
@@ -176,23 +137,9 @@ private:
     terminate(this);
   }
 
-  void timedout()
-  {
-    // Need to discard all of the futures so any of their associated
-    // resources can get properly cleaned up.
-    typename std::list<Future<T> >::const_iterator iterator;
-    for (iterator = futures.begin(); iterator != futures.end(); ++iterator) {
-      Future<T> future = *iterator; // Need a non-const copy to discard.
-      future.discard();
-    }
-
-    promise->fail("Collect failed: timed out");
-    terminate(this);
-  }
-
   void waited(const Future<T>& future)
   {
-    assert(!future.isPending());
+    CHECK(!future.isPending());
 
     ready += 1;
     if (ready == futures.size()) {
@@ -202,7 +149,6 @@ private:
   }
 
   const std::list<Future<T> > futures;
-  const Option<Timeout> timeout;
   Promise<std::list<Future<T> > >* promise;
   size_t ready;
 };
@@ -233,8 +179,7 @@ Future<tuples::tuple<Future<T1>, Future<T2> > > __await(
 
 template <typename T>
 inline Future<std::list<T> > collect(
-    const std::list<Future<T> >& futures,
-    const Option<Timeout>& timeout)
+    const std::list<Future<T> >& futures)
 {
   if (futures.empty()) {
     return std::list<T>();
@@ -242,15 +187,14 @@ inline Future<std::list<T> > collect(
 
   Promise<std::list<T> >* promise = new Promise<std::list<T> >();
   Future<std::list<T> > future = promise->future();
-  spawn(new internal::CollectProcess<T>(futures, timeout, promise), true);
+  spawn(new internal::CollectProcess<T>(futures, promise), true);
   return future;
 }
 
 
 template <typename T>
 inline Future<std::list<Future<T> > > await(
-    const std::list<Future<T> >& futures,
-    const Option<Timeout>& timeout)
+    const std::list<Future<T> >& futures)
 {
   if (futures.empty()) {
     return futures;
@@ -259,7 +203,7 @@ inline Future<std::list<Future<T> > > await(
   Promise<std::list<Future<T> > >* promise =
     new Promise<std::list<Future<T> > >();
   Future<std::list<Future<T> > > future = promise->future();
-  spawn(new internal::AwaitProcess<T>(futures, timeout, promise), true);
+  spawn(new internal::AwaitProcess<T>(futures, promise), true);
   return future;
 }
 


[2/5] git commit: Cleaned up master detector usage in tests.

Posted by be...@apache.org.
Cleaned up master detector usage in tests.

Review: https://reviews.apache.org/r/19834


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/bafce319
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/bafce319
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/bafce319

Branch: refs/heads/master
Commit: bafce31921254459b0cf426f2d72637ec38a135f
Parents: 58241c0
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sat Mar 29 14:06:41 2014 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Apr 12 17:35:37 2014 -0600

----------------------------------------------------------------------
 src/tests/cluster.hpp                         | 121 +++++++++++----------
 src/tests/fault_tolerance_tests.cpp           |  98 ++++++++---------
 src/tests/master_contender_detector_tests.cpp |   5 +-
 src/tests/master_tests.cpp                    |  17 ++-
 src/tests/mesos.cpp                           |   6 +-
 src/tests/mesos.hpp                           |   6 +-
 6 files changed, 127 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/bafce319/src/tests/cluster.hpp
----------------------------------------------------------------------
diff --git a/src/tests/cluster.hpp b/src/tests/cluster.hpp
index 11684d9..8479fe3 100644
--- a/src/tests/cluster.hpp
+++ b/src/tests/cluster.hpp
@@ -91,10 +91,9 @@ public:
     // the launched master.  If no allocator process is specified then
     // the default allocator will be instantiated.
     Try<process::PID<master::Master> > start(
-        Option<master::allocator::AllocatorProcess*> allocatorProcess,
+        const Option<master::allocator::AllocatorProcess*>& allocatorProcess,
         const master::Flags& flags = master::Flags());
 
-
     // Stops and cleans up a master at the specified PID.
     Try<Nothing> stop(const process::PID<master::Master>& pid);
 
@@ -110,9 +109,9 @@ public:
     Option<zookeeper::URL> url;
 
     // Encapsulates a single master's dependencies.
-    struct MasterInfo
+    struct Master
     {
-      MasterInfo()
+      Master()
         : master(NULL),
           allocator(NULL),
           allocatorProcess(NULL),
@@ -130,7 +129,7 @@ public:
       MasterDetector* detector;
     };
 
-    std::map<process::PID<master::Master>, MasterInfo> masters;
+    std::map<process::PID<master::Master>, Master> masters;
   };
 
   // Abstracts the slaves of a cluster.
@@ -159,12 +158,12 @@ public:
     // Detector. The detector is expected to outlive the launched
     // slave (i.e., until it is stopped via Slaves::stop).
     Try<process::PID<slave::Slave> > start(
-        process::Owned<MasterDetector> detector,
+        const Option<MasterDetector*>& detector,
         const slave::Flags& flags = slave::Flags());
 
     Try<process::PID<slave::Slave> > start(
         slave::Containerizer* containerizer,
-        process::Owned<MasterDetector> detector,
+        const Option<MasterDetector*>& detector,
         const slave::Flags& flags = slave::Flags());
 
     // Stops and cleans up a slave at the specified PID. If 'shutdown'
@@ -239,7 +238,7 @@ inline Cluster::Masters::~Masters()
 inline void Cluster::Masters::shutdown()
 {
   // TODO(benh): Use utils::copy from stout once namespaced.
-  std::map<process::PID<master::Master>, MasterInfo> copy(masters);
+  std::map<process::PID<master::Master>, Master> copy(masters);
   foreachkey (const process::PID<master::Master>& pid, copy) {
     stop(pid);
   }
@@ -255,7 +254,7 @@ inline Try<process::PID<master::Master> > Cluster::Masters::start(
 
 
 inline Try<process::PID<master::Master> > Cluster::Masters::start(
-    Option<master::allocator::AllocatorProcess*> allocatorProcess,
+    const Option<master::allocator::AllocatorProcess*>& allocatorProcess,
     const master::Flags& flags)
 {
   // Disallow multiple masters when not using ZooKeeper.
@@ -263,16 +262,16 @@ inline Try<process::PID<master::Master> > Cluster::Masters::start(
     return Error("Can not start multiple masters when not using ZooKeeper");
   }
 
-  MasterInfo masterInfo;
+  Master master;
 
   if (allocatorProcess.isNone()) {
-    masterInfo.allocatorProcess =
+    master.allocatorProcess =
         new master::allocator::HierarchicalDRFAllocatorProcess();
-    masterInfo.allocator =
-        new master::allocator::Allocator(masterInfo.allocatorProcess);
+    master.allocator =
+        new master::allocator::Allocator(master.allocatorProcess);
   } else {
-    masterInfo.allocatorProcess = NULL;
-    masterInfo.allocator =
+    master.allocatorProcess = NULL;
+    master.allocator =
         new master::allocator::Allocator(allocatorProcess.get());
   }
 
@@ -281,44 +280,44 @@ inline Try<process::PID<master::Master> > Cluster::Masters::start(
   }
 
   if (flags.registry == "in_memory") {
-    masterInfo.storage = new state::InMemoryStorage();
+    master.storage = new state::InMemoryStorage();
   } else {
     return Error("'" + flags.registry + "' is not a supported"
                  " option for registry persistence");
   }
 
-  CHECK_NOTNULL(masterInfo.storage);
+  CHECK_NOTNULL(master.storage);
 
-  masterInfo.state = new state::protobuf::State(masterInfo.storage);
-  masterInfo.registrar = new master::Registrar(flags, masterInfo.state);
-  masterInfo.repairer = new master::Repairer();
+  master.state = new state::protobuf::State(master.storage);
+  master.registrar = new master::Registrar(flags, master.state);
+  master.repairer = new master::Repairer();
 
   if (url.isSome()) {
-    masterInfo.contender = new ZooKeeperMasterContender(url.get());
-    masterInfo.detector = new ZooKeeperMasterDetector(url.get());
+    master.contender = new ZooKeeperMasterContender(url.get());
+    master.detector = new ZooKeeperMasterDetector(url.get());
   } else {
-    masterInfo.contender = new StandaloneMasterContender();
-    masterInfo.detector = new StandaloneMasterDetector();
+    master.contender = new StandaloneMasterContender();
+    master.detector = new StandaloneMasterDetector();
   }
 
-  masterInfo.master = new master::Master(
-      masterInfo.allocator,
-      masterInfo.registrar,
-      masterInfo.repairer,
+  master.master = new master::Master(
+      master.allocator,
+      master.registrar,
+      master.repairer,
       &cluster->files,
-      masterInfo.contender,
-      masterInfo.detector,
+      master.contender,
+      master.detector,
       flags);
 
   if (url.isNone()) {
     // This means we are using the StandaloneMasterDetector.
-    CHECK_NOTNULL(dynamic_cast<StandaloneMasterDetector*>(masterInfo.detector))
-        ->appoint(masterInfo.master->info());
+    CHECK_NOTNULL(dynamic_cast<StandaloneMasterDetector*>(master.detector))
+        ->appoint(master.master->info());
   }
 
-  process::PID<master::Master> pid = process::spawn(masterInfo.master);
+  process::PID<master::Master> pid = process::spawn(master.master);
 
-  masters[pid] = masterInfo;
+  masters[pid] = master;
 
   return pid;
 }
@@ -331,22 +330,22 @@ inline Try<Nothing> Cluster::Masters::stop(
     return Error("No master found to stop");
   }
 
-  MasterInfo masterInfo = masters[pid];
+  Master master = masters[pid];
 
-  process::terminate(masterInfo.master);
-  process::wait(masterInfo.master);
-  delete masterInfo.master;
+  process::terminate(master.master);
+  process::wait(master.master);
+  delete master.master;
 
-  delete masterInfo.allocator; // Terminates and waits for allocator process.
-  delete masterInfo.allocatorProcess; // May be NULL.
+  delete master.allocator; // Terminates and waits for allocator process.
+  delete master.allocatorProcess; // May be NULL.
 
-  delete masterInfo.registrar;
-  delete masterInfo.repairer;
-  delete masterInfo.state;
-  delete masterInfo.storage;
+  delete master.registrar;
+  delete master.repairer;
+  delete master.state;
+  delete master.storage;
 
-  delete masterInfo.contender;
-  delete masterInfo.detector;
+  delete master.contender;
+  delete master.detector;
 
   masters.erase(pid);
 
@@ -422,12 +421,12 @@ inline Try<process::PID<slave::Slave> > Cluster::Slaves::start(
     slave::Containerizer* containerizer,
     const slave::Flags& flags)
 {
-  return start(containerizer, masters->detector(), flags);
+  return start(containerizer, None(), flags);
 }
 
 
 inline Try<process::PID<slave::Slave> > Cluster::Slaves::start(
-    process::Owned<MasterDetector> detector,
+    const Option<MasterDetector*>& detector,
     const slave::Flags& flags)
 {
   // TODO(benh): Create a work directory if using the default.
@@ -443,11 +442,17 @@ inline Try<process::PID<slave::Slave> > Cluster::Slaves::start(
 
   slave.containerizer = containerizer.get();
 
-  // Get a detector for the master(s).
-  slave.detector = detector;
+  // Get a detector for the master(s) if one wasn't provided.
+  if (detector.isNone()) {
+    slave.detector = masters->detector();
+  }
 
   slave.slave = new slave::Slave(
-      flags, slave.detector.get(), slave.containerizer, &cluster->files);
+      flags,
+      detector.get(slave.detector.get()),
+      slave.containerizer,
+      &cluster->files);
+
   process::PID<slave::Slave> pid = process::spawn(slave.slave);
 
   slaves[pid] = slave;
@@ -458,7 +463,7 @@ inline Try<process::PID<slave::Slave> > Cluster::Slaves::start(
 
 inline Try<process::PID<slave::Slave> > Cluster::Slaves::start(
     slave::Containerizer* containerizer,
-    process::Owned<MasterDetector> detector,
+    const Option<MasterDetector*>& detector,
     const slave::Flags& flags)
 {
   // TODO(benh): Create a work directory if using the default.
@@ -467,11 +472,17 @@ inline Try<process::PID<slave::Slave> > Cluster::Slaves::start(
 
   slave.flags = flags;
 
-  // Get a detector for the master(s).
-  slave.detector = detector;
+  // Get a detector for the master(s) if one wasn't provided.
+  if (detector.isNone()) {
+    slave.detector = masters->detector();
+  }
 
   slave.slave = new slave::Slave(
-      flags, slave.detector.get(), containerizer, &cluster->files);
+      flags,
+      detector.get(slave.detector.get()),
+      containerizer,
+      &cluster->files);
+
   process::PID<slave::Slave> pid = process::spawn(slave.slave);
 
   slaves[pid] = slave;

http://git-wip-us.apache.org/repos/asf/mesos/blob/bafce319/src/tests/fault_tolerance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fault_tolerance_tests.cpp b/src/tests/fault_tolerance_tests.cpp
index 99311c3..4796149 100644
--- a/src/tests/fault_tolerance_tests.cpp
+++ b/src/tests/fault_tolerance_tests.cpp
@@ -222,10 +222,9 @@ TEST_F(FaultToleranceTest, PartitionedSlaveReregistration)
 
   MockExecutor exec(DEFAULT_EXECUTOR_ID);
 
-  StandaloneMasterDetector* detector =
-    new StandaloneMasterDetector(master.get());
+  StandaloneMasterDetector detector(master.get());
 
-  Try<PID<Slave> > slave = StartSlave(&exec, Owned<MasterDetector>(detector));
+  Try<PID<Slave> > slave = StartSlave(&exec, &detector);
   ASSERT_SOME(slave);
 
   MockScheduler sched;
@@ -327,7 +326,7 @@ TEST_F(FaultToleranceTest, PartitionedSlaveReregistration)
   // We now complete the partition on the slave side as well. This
   // is done by simulating a master loss event which would normally
   // occur during a network partition.
-  detector->appoint(None());
+  detector.appoint(None());
 
   Future<Nothing> shutdown;
   EXPECT_CALL(exec, shutdown(_))
@@ -336,7 +335,7 @@ TEST_F(FaultToleranceTest, PartitionedSlaveReregistration)
   shutdownMessage = FUTURE_PROTOBUF(ShutdownMessage(), _, slave.get());
 
   // Have the slave re-register with the master.
-  detector->appoint(master.get());
+  detector.appoint(master.get());
 
   // Upon re-registration, the master will shutdown the slave.
   // The slave will then shut down the executor.
@@ -609,9 +608,8 @@ TEST_F(FaultToleranceTest, MasterFailover)
   ASSERT_SOME(master);
 
   MockScheduler sched;
-  StandaloneMasterDetector* detector =
-    new StandaloneMasterDetector(master.get());
-  TestingMesosSchedulerDriver driver(&sched, detector);
+  StandaloneMasterDetector detector(master.get());
+  TestingMesosSchedulerDriver driver(&sched, &detector);
 
   Future<process::Message> frameworkRegisteredMessage =
     FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()), _, _);
@@ -641,7 +639,7 @@ TEST_F(FaultToleranceTest, MasterFailover)
     .WillOnce(FutureSatisfy(&registered2));
 
   // Simulate a new master detected message to the scheduler.
-  detector->appoint(master.get());
+  detector.appoint(master.get());
 
   // Scheduler should retry authentication.
   AWAIT_READY(authenticateMessage);
@@ -652,8 +650,6 @@ TEST_F(FaultToleranceTest, MasterFailover)
   driver.stop();
   driver.join();
 
-  delete detector;
-
   Shutdown();
 }
 
@@ -681,12 +677,12 @@ TEST_F(FaultToleranceTest, ReregisterCompletedFrameworks)
   ASSERT_SOME(master);
 
   MockExecutor executor(DEFAULT_EXECUTOR_ID);
+
   TestContainerizer containerizer(&executor);
 
-  Owned<MasterDetector> slaveDetector(
-      new StandaloneMasterDetector(master.get()));
+  StandaloneMasterDetector slaveDetector(master.get());
 
-  Try<PID<Slave> > slave = StartSlave(&containerizer, slaveDetector);
+  Try<PID<Slave> > slave = StartSlave(&containerizer, &slaveDetector);
   ASSERT_SOME(slave);
 
   // Verify master/slave have 0 completed/running frameworks.
@@ -810,8 +806,8 @@ TEST_F(FaultToleranceTest, ReregisterCompletedFrameworks)
   Future<SlaveReregisteredMessage> slaveReregisteredMessage =
     FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
 
-  dynamic_cast<StandaloneMasterDetector*>(slaveDetector.get())->appoint(
-      master.get());
+  // Simulate a new master detected message to the slave.
+  slaveDetector.appoint(master.get());
 
   AWAIT_READY(slaveReregisteredMessage);
 
@@ -1045,19 +1041,17 @@ TEST_F(FaultToleranceTest, FrameworkReregister)
   Try<PID<Master> > master = StartMaster();
   ASSERT_SOME(master);
 
-  Owned<MasterDetector> slaveDetector(
-      new StandaloneMasterDetector(master.get()));
-  Try<PID<Slave> > slave = StartSlave(slaveDetector);
-  ASSERT_SOME(slave);
+  StandaloneMasterDetector slaveDetector(master.get());
 
+  Try<PID<Slave> > slave = StartSlave(&slaveDetector);
+  ASSERT_SOME(slave);
 
   // Create a detector for the scheduler driver because we want the
   // spurious leading master change to be known by the scheduler
   // driver only.
-  Owned<MasterDetector> schedDetector(
-      new StandaloneMasterDetector(master.get()));
+  StandaloneMasterDetector schedDetector(master.get());
   MockScheduler sched;
-  TestingMesosSchedulerDriver driver(&sched, schedDetector.get());
+  TestingMesosSchedulerDriver driver(&sched, &schedDetector);
 
   Future<Nothing> registered;
   EXPECT_CALL(sched, registered(&driver, _, _))
@@ -1093,8 +1087,7 @@ TEST_F(FaultToleranceTest, FrameworkReregister)
     .Times(AtMost(1));
 
   // Simulate a spurious leading master change at the scheduler.
-  dynamic_cast<StandaloneMasterDetector*>(schedDetector.get())->appoint(
-      master.get());
+  schedDetector.appoint(master.get());
 
   AWAIT_READY(disconnected);
 
@@ -1119,9 +1112,8 @@ TEST_F(FaultToleranceTest, TaskLost)
   ASSERT_SOME(slave);
 
   MockScheduler sched;
-  StandaloneMasterDetector* detector =
-    new StandaloneMasterDetector(master.get());
-  TestingMesosSchedulerDriver driver(&sched, detector);
+  StandaloneMasterDetector detector(master.get());
+  TestingMesosSchedulerDriver driver(&sched, &detector);
 
   EXPECT_CALL(sched, registered(&driver, _, _));
 
@@ -1145,7 +1137,7 @@ TEST_F(FaultToleranceTest, TaskLost)
     .WillOnce(FutureSatisfy(&disconnected));
 
   // Simulate a spurious master loss event at the scheduler.
-  detector->appoint(None());
+  detector.appoint(None());
 
   AWAIT_READY(disconnected);
 
@@ -1301,15 +1293,14 @@ TEST_F(FaultToleranceTest, ReregisterFrameworkExitedExecutor)
   MockExecutor exec(DEFAULT_EXECUTOR_ID);
   TestContainerizer containerizer(&exec);
 
-  Owned<MasterDetector> slaveDetector(
-      new StandaloneMasterDetector(master.get()));
-  Try<PID<Slave> > slave = StartSlave(&containerizer, slaveDetector);
+  StandaloneMasterDetector slaveDetector(master.get());
+
+  Try<PID<Slave> > slave = StartSlave(&containerizer, &slaveDetector);
   ASSERT_SOME(slave);
 
   MockScheduler sched;
-  Owned<StandaloneMasterDetector> schedDetector(
-      new StandaloneMasterDetector(master.get()));
-  TestingMesosSchedulerDriver driver(&sched, schedDetector.get());
+  StandaloneMasterDetector schedDetector(master.get());
+  TestingMesosSchedulerDriver driver(&sched, &schedDetector);
 
   Future<process::Message> frameworkRegisteredMessage =
     FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()), _, _);
@@ -1361,8 +1352,7 @@ TEST_F(FaultToleranceTest, ReregisterFrameworkExitedExecutor)
   Future<SlaveReregisteredMessage> slaveReregisteredMessage =
     FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
 
-  dynamic_cast<StandaloneMasterDetector*>(slaveDetector.get())->appoint(
-      master.get());
+  slaveDetector.appoint(master.get());
 
   // Wait for the slave to re-register.
   AWAIT_READY(slaveReregisteredMessage);
@@ -1386,7 +1376,7 @@ TEST_F(FaultToleranceTest, ReregisterFrameworkExitedExecutor)
 
   EXPECT_CALL(sched, registered(&driver, _, _));
 
-  schedDetector->appoint(master.get());
+  schedDetector.appoint(master.get());
 
   AWAIT_READY(frameworkRegisteredMessage2);
 
@@ -1822,8 +1812,9 @@ TEST_F(FaultToleranceTest, SlaveReregisterOnZKExpiration)
   Future<SlaveRegisteredMessage> slaveRegisteredMessage =
     FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
 
-  StandaloneMasterDetector* detector = new StandaloneMasterDetector(master.get());
-  Try<PID<Slave> > slave = StartSlave(Owned<MasterDetector>(detector));
+  StandaloneMasterDetector detector(master.get());
+
+  Try<PID<Slave> > slave = StartSlave(&detector);
   ASSERT_SOME(slave);
 
   AWAIT_READY(slaveRegisteredMessage);
@@ -1848,7 +1839,7 @@ TEST_F(FaultToleranceTest, SlaveReregisterOnZKExpiration)
 
   // Simulate a spurious master change event (e.g., due to ZooKeeper
   // expiration) at the slave.
-  detector->appoint(master.get());
+  detector.appoint(master.get());
 
   AWAIT_READY(slaveReregisteredMessage);
 
@@ -1872,10 +1863,9 @@ TEST_F(FaultToleranceTest, SlaveReregisterTerminatedExecutor)
   MockExecutor exec(DEFAULT_EXECUTOR_ID);
   TestContainerizer containerizer(&exec);
 
-  StandaloneMasterDetector* detector =
-    new StandaloneMasterDetector(master.get());
-  Try<PID<Slave> > slave =
-    StartSlave(&containerizer, Owned<MasterDetector>(detector));
+  StandaloneMasterDetector detector(master.get());
+
+  Try<PID<Slave> > slave = StartSlave(&containerizer, &detector);
   ASSERT_SOME(slave);
 
   MockScheduler sched;
@@ -1923,7 +1913,7 @@ TEST_F(FaultToleranceTest, SlaveReregisterTerminatedExecutor)
   EXPECT_CALL(sched, statusUpdate(&driver, _))
     .WillOnce(FutureArg<1>(&status2));
 
-  detector->appoint(master.get());
+  detector.appoint(master.get());
 
   AWAIT_READY(status2);
   EXPECT_EQ(TASK_LOST, status2.get().state());
@@ -1943,9 +1933,9 @@ TEST_F(FaultToleranceTest, ReconcileLostTasks)
   Try<PID<Master> > master = StartMaster();
   ASSERT_SOME(master);
 
-  StandaloneMasterDetector* detector =
-    new StandaloneMasterDetector(master.get());
-  Try<PID<Slave> > slave = StartSlave(Owned<MasterDetector>(detector));
+  StandaloneMasterDetector detector(master.get());
+
+  Try<PID<Slave> > slave = StartSlave(&detector);
   ASSERT_SOME(slave);
 
   MockScheduler sched;
@@ -1993,7 +1983,7 @@ TEST_F(FaultToleranceTest, ReconcileLostTasks)
 
   // Simulate a spurious master change event (e.g., due to ZooKeeper
   // expiration) at the slave to force re-registration.
-  detector->appoint(master.get());
+  detector.appoint(master.get());
 
   AWAIT_READY(slaveReregisteredMessage);
 
@@ -2019,9 +2009,9 @@ TEST_F(FaultToleranceTest, ReconcileIncompleteTasks)
 
   MockExecutor exec(DEFAULT_EXECUTOR_ID);
 
-  StandaloneMasterDetector* detector =
-    new StandaloneMasterDetector(master.get());
-  Try<PID<Slave> > slave = StartSlave(&exec, Owned<MasterDetector>(detector));
+  StandaloneMasterDetector detector(master.get());
+
+  Try<PID<Slave> > slave = StartSlave(&exec, &detector);
   ASSERT_SOME(slave);
 
   MockScheduler sched;
@@ -2078,7 +2068,7 @@ TEST_F(FaultToleranceTest, ReconcileIncompleteTasks)
 
   // Simulate a spurious master change event (e.g., due to ZooKeeper
   // expiration) at the slave to force re-registration.
-  detector->appoint(master.get());
+  detector.appoint(master.get());
 
   AWAIT_READY(slaveReregisteredMessage);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/bafce319/src/tests/master_contender_detector_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_contender_detector_tests.cpp b/src/tests/master_contender_detector_tests.cpp
index 8da7420..42051bf 100644
--- a/src/tests/master_contender_detector_tests.cpp
+++ b/src/tests/master_contender_detector_tests.cpp
@@ -102,7 +102,8 @@ TEST_F(MasterContenderDetectorTest, File)
 
   ASSERT_SOME(detector);
 
-  StartSlave(Owned<MasterDetector>(detector.get()), flags);
+  Try<PID<Slave> > slave = StartSlave(detector.get(), flags);
+  ASSERT_SOME(slave);
 
   MockScheduler sched;
   MesosSchedulerDriver driver(
@@ -123,6 +124,8 @@ TEST_F(MasterContenderDetectorTest, File)
   driver.join();
 
   Shutdown();
+
+  delete detector.get();
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/bafce319/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index 599f4a0..c047397 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -722,14 +722,13 @@ TEST_F(MasterTest, MasterInfoOnReElection)
   Try<PID<Master> > master = StartMaster();
   ASSERT_SOME(master);
 
-  StandaloneMasterDetector* detector =
-    new StandaloneMasterDetector(master.get());
+  StandaloneMasterDetector detector(master.get());
 
-  Try<PID<Slave> > slave = StartSlave(Owned<MasterDetector>(detector));
+  Try<PID<Slave> > slave = StartSlave(&detector);
   ASSERT_SOME(slave);
 
   MockScheduler sched;
-  TestingMesosSchedulerDriver driver(&sched, detector);
+  TestingMesosSchedulerDriver driver(&sched, &detector);
 
   EXPECT_CALL(sched, registered(&driver, _, _))
     .Times(1);
@@ -761,7 +760,7 @@ TEST_F(MasterTest, MasterInfoOnReElection)
 
   // Simulate a spurious event (e.g., due to ZooKeeper
   // expiration) at the scheduler.
-  detector->appoint(master.get());
+  detector.appoint(master.get());
 
   AWAIT_READY(disconnected);
 
@@ -841,15 +840,13 @@ TEST_F(MasterTest, MasterLost)
   Try<PID<Master> > master = StartMaster();
   ASSERT_SOME(master);
 
-  Owned<StandaloneMasterDetector> detector(
-      new StandaloneMasterDetector());
-  detector->appoint(master.get());
+  StandaloneMasterDetector detector(master.get());
 
   Try<PID<Slave> > slave = StartSlave();
   ASSERT_SOME(slave);
 
   MockScheduler sched;
-  TestingMesosSchedulerDriver driver(&sched, detector.get());
+  TestingMesosSchedulerDriver driver(&sched, &detector);
 
   EXPECT_CALL(sched, registered(&driver, _, _))
     .Times(1);
@@ -869,7 +866,7 @@ TEST_F(MasterTest, MasterLost)
     .WillOnce(FutureSatisfy(&disconnected));
 
   // Simulate a spurious event at the scheduler.
-  detector->appoint(None());
+  detector.appoint(None());
 
   AWAIT_READY(disconnected);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/bafce319/src/tests/mesos.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.cpp b/src/tests/mesos.cpp
index ae3aeee..a9844e4 100644
--- a/src/tests/mesos.cpp
+++ b/src/tests/mesos.cpp
@@ -176,7 +176,7 @@ Try<process::PID<slave::Slave> > MesosTest::StartSlave(
 
 Try<process::PID<slave::Slave> > MesosTest::StartSlave(
     slave::Containerizer* containerizer,
-    Owned<MasterDetector> detector,
+    MasterDetector* detector,
     const Option<slave::Flags>& flags)
 {
   return cluster.slaves.start(
@@ -187,7 +187,7 @@ Try<process::PID<slave::Slave> > MesosTest::StartSlave(
 
 
 Try<PID<slave::Slave> > MesosTest::StartSlave(
-    Owned<MasterDetector> detector,
+    MasterDetector* detector,
     const Option<slave::Flags>& flags)
 {
   return cluster.slaves.start(
@@ -197,7 +197,7 @@ Try<PID<slave::Slave> > MesosTest::StartSlave(
 
 Try<PID<slave::Slave> > MesosTest::StartSlave(
     MockExecutor* executor,
-    Owned<MasterDetector> detector,
+    MasterDetector* detector,
     const Option<slave::Flags>& flags)
 {
   slave::Containerizer* containerizer = new TestContainerizer(executor);

http://git-wip-us.apache.org/repos/asf/mesos/blob/bafce319/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index f77fbfe..7bc5e98 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -113,19 +113,19 @@ protected:
   // Starts a slave with the specified containerizer, detector and flags.
   virtual Try<process::PID<slave::Slave> > StartSlave(
       slave::Containerizer* containerizer,
-      process::Owned<MasterDetector> detector,
+      MasterDetector* detector,
       const Option<slave::Flags>& flags = None());
 
   // Starts a slave with the specified MasterDetector and flags.
   virtual Try<process::PID<slave::Slave> > StartSlave(
-      process::Owned<MasterDetector> detector,
+      MasterDetector* detector,
       const Option<slave::Flags>& flags = None());
 
   // Starts a slave with the specified mock executor, MasterDetector
   // and flags.
   virtual Try<process::PID<slave::Slave> > StartSlave(
       MockExecutor* executor,
-      process::Owned<MasterDetector> detector,
+      MasterDetector* detector,
       const Option<slave::Flags>& flags = None());
 
   // Stop the specified master.