You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/01/17 01:58:28 UTC
[08/10] git commit: Added a watch function to watch for network size
changes.
Added a watch function to watch for network size changes.
From: Jie Yu <yu...@gmail.com>
Review: https://reviews.apache.org/r/16064
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c1e3b741
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c1e3b741
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c1e3b741
Branch: refs/heads/master
Commit: c1e3b741d2936340b6ea7170a1737d1e5d838d07
Parents: fa5d450
Author: Benjamin Hindman <be...@gmail.com>
Authored: Thu Jan 16 16:55:56 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Jan 16 16:55:56 2014 -0800
----------------------------------------------------------------------
src/log/network.hpp | 110 +++++++++++++++++++++++++++++++++++++++++++
src/tests/log_tests.cpp | 49 +++++++++++++++++++
2 files changed, 159 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/c1e3b741/src/log/network.hpp
----------------------------------------------------------------------
diff --git a/src/log/network.hpp b/src/log/network.hpp
index 2b674f6..9c76bf8 100644
--- a/src/log/network.hpp
+++ b/src/log/network.hpp
@@ -22,6 +22,7 @@
// TODO(benh): Eventually move and associate this code with the
// libprocess protobuf code rather than keep it here.
+#include <list>
#include <set>
#include <string>
@@ -49,6 +50,16 @@ class NetworkProcess;
class Network
{
public:
+ enum WatchMode
+ {
+ EQUAL_TO,
+ NOT_EQUAL_TO,
+ LESS_THAN,
+ LESS_THAN_OR_EQUAL_TO,
+ GREATER_THAN,
+ GREATER_THAN_OR_EQUAL_TO
+ };
+
Network();
Network(const std::set<process::UPID>& pids);
virtual ~Network();
@@ -62,6 +73,14 @@ public:
// Set the PIDs that are part of this network.
void set(const std::set<process::UPID>& pids);
+ // Returns a future which gets set when the network size satisfies
+ // the constraint specified by 'size' and 'mode'. For example, if
+ // 'size' is 2 and 'mode' is GREATER_THAN, then the returned future
+ // will get set when the size of the network is greater than 2.
+ process::Future<size_t> watch(
+ size_t size,
+ WatchMode mode = NOT_EQUAL_TO) const;
+
// Sends a request to each member of the network and returns a set
// of futures that represent their responses.
template <typename Req, typename Res>
@@ -135,12 +154,18 @@ public:
{
link(pid); // Try and keep a socket open (more efficient).
pids.insert(pid);
+
+ // Update any pending watches.
+ update();
}
void remove(const process::UPID& pid)
{
// TODO(benh): unlink(pid);
pids.erase(pid);
+
+ // Update any pending watches.
+ update();
}
void set(const std::set<process::UPID>& _pids)
@@ -149,6 +174,23 @@ public:
foreach (const process::UPID& pid, _pids) {
add(pid); // Also does a link.
}
+
+ // Update any pending watches.
+ update();
+ }
+
+ process::Future<size_t> watch(size_t size, Network::WatchMode mode)
+ {
+ if (satisfied(size, mode)) {
+ return pids.size();
+ }
+
+ Watch* watch = new Watch(size, mode);
+ watches.push_back(watch);
+
+ // TODO(jieyu): Consider deleting 'watch' if the returned future
+ // is discarded by the user.
+ return watch->promise.future();
}
// Sends a request to each of the groups members and returns a set
@@ -185,12 +227,73 @@ public:
return Nothing();
}
+protected:
+ virtual void finalize()
+ {
+ foreach (Watch* watch, watches) {
+ watch->promise.fail("Network is being terminated");
+ delete watch;
+ }
+ watches.clear();
+ }
+
private:
+ struct Watch
+ {
+ Watch(size_t _size, Network::WatchMode _mode)
+ : size(_size), mode(_mode) {}
+
+ size_t size;
+ Network::WatchMode mode;
+ process::Promise<size_t> promise;
+ };
+
// Not copyable, not assignable.
NetworkProcess(const NetworkProcess&);
NetworkProcess& operator = (const NetworkProcess&);
+ // Notifies the change of the network.
+ void update()
+ {
+ const size_t size = watches.size();
+ for (size_t i = 0; i < size; i++) {
+ Watch* watch = watches.front();
+ watches.pop_front();
+
+ if (satisfied(watch->size, watch->mode)) {
+ watch->promise.set(pids.size());
+ delete watch;
+ } else {
+ watches.push_back(watch);
+ }
+ }
+ }
+
+ // Returns true if the current size of the network satisfies the
+ // constraint specified by 'size' and 'mode'.
+ bool satisfied(size_t size, Network::WatchMode mode)
+ {
+ switch (mode) {
+ case Network::EQUAL_TO:
+ return pids.size() == size;
+ case Network::NOT_EQUAL_TO:
+ return pids.size() != size;
+ case Network::LESS_THAN:
+ return pids.size() < size;
+ case Network::LESS_THAN_OR_EQUAL_TO:
+ return pids.size() <= size;
+ case Network::GREATER_THAN:
+ return pids.size() > size;
+ case Network::GREATER_THAN_OR_EQUAL_TO:
+ return pids.size() >= size;
+ default:
+ LOG(FATAL) << "Invalid watch mode";
+ break;
+ }
+ }
+
std::set<process::UPID> pids;
+ std::list<Watch*> watches;
};
@@ -234,6 +337,13 @@ inline void Network::set(const std::set<process::UPID>& pids)
}
+inline process::Future<size_t> Network::watch(
+ size_t size, Network::WatchMode mode) const
+{
+ return process::dispatch(process, &NetworkProcess::watch, size, mode);
+}
+
+
template <typename Req, typename Res>
process::Future<std::set<process::Future<Res> > > Network::broadcast(
const Protocol<Req, Res>& protocol,
http://git-wip-us.apache.org/repos/asf/mesos/blob/c1e3b741/src/tests/log_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/log_tests.cpp b/src/tests/log_tests.cpp
index f866dde..033e8e5 100644
--- a/src/tests/log_tests.cpp
+++ b/src/tests/log_tests.cpp
@@ -28,6 +28,7 @@
#include <process/gtest.hpp>
#include <process/owned.hpp>
#include <process/pid.hpp>
+#include <process/process.hpp>
#include <process/protobuf.hpp>
#include <process/shared.hpp>
@@ -68,6 +69,54 @@ using testing::Eq;
using testing::Return;
+TEST(NetworkTest, Watch)
+{
+ UPID pid1 = ProcessBase().self();
+ UPID pid2 = ProcessBase().self();
+
+ Network network;
+
+ // Test the default parameter.
+ Future<size_t> future = network.watch(1u);
+ AWAIT_READY(future);
+ EXPECT_EQ(0u, future.get());
+
+ future = network.watch(2u, Network::NOT_EQUAL_TO);
+ AWAIT_READY(future);
+ EXPECT_EQ(0u, future.get());
+
+ future = network.watch(0u, Network::GREATER_THAN_OR_EQUAL_TO);
+ AWAIT_READY(future);
+ EXPECT_EQ(0u, future.get());
+
+ future = network.watch(1u, Network::LESS_THAN);
+ AWAIT_READY(future);
+ EXPECT_EQ(0u, future.get());
+
+ network.add(pid1);
+
+ future = network.watch(1u, Network::EQUAL_TO);
+ AWAIT_READY(future);
+ EXPECT_EQ(1u, future.get());
+
+ future = network.watch(1u, Network::GREATER_THAN);
+ ASSERT_TRUE(future.isPending());
+
+ network.add(pid2);
+
+ AWAIT_READY(future);
+ EXPECT_EQ(2u, future.get());
+
+ future = network.watch(1u, Network::LESS_THAN_OR_EQUAL_TO);
+ ASSERT_TRUE(future.isPending());
+
+ network.remove(pid2);
+
+ AWAIT_READY(future);
+ EXPECT_EQ(1u, future.get());
+}
+
+
class ReplicaTest : public TemporaryDirectoryTest
{
protected: