You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/01/17 01:58:28 UTC

[08/10] git commit: Added a watch function to watch for network size changes.

Added a watch function to watch for network size changes.

From: Jie Yu <yu...@gmail.com>
Review: https://reviews.apache.org/r/16064


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c1e3b741
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c1e3b741
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c1e3b741

Branch: refs/heads/master
Commit: c1e3b741d2936340b6ea7170a1737d1e5d838d07
Parents: fa5d450
Author: Benjamin Hindman <be...@gmail.com>
Authored: Thu Jan 16 16:55:56 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Jan 16 16:55:56 2014 -0800

----------------------------------------------------------------------
 src/log/network.hpp     | 110 +++++++++++++++++++++++++++++++++++++++++++
 src/tests/log_tests.cpp |  49 +++++++++++++++++++
 2 files changed, 159 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/c1e3b741/src/log/network.hpp
----------------------------------------------------------------------
diff --git a/src/log/network.hpp b/src/log/network.hpp
index 2b674f6..9c76bf8 100644
--- a/src/log/network.hpp
+++ b/src/log/network.hpp
@@ -22,6 +22,7 @@
 // TODO(benh): Eventually move and associate this code with the
 // libprocess protobuf code rather than keep it here.
 
+#include <list>
 #include <set>
 #include <string>
 
@@ -49,6 +50,16 @@ class NetworkProcess;
 class Network
 {
 public:
+  enum WatchMode
+  {
+    EQUAL_TO,
+    NOT_EQUAL_TO,
+    LESS_THAN,
+    LESS_THAN_OR_EQUAL_TO,
+    GREATER_THAN,
+    GREATER_THAN_OR_EQUAL_TO
+  };
+
   Network();
   Network(const std::set<process::UPID>& pids);
   virtual ~Network();
@@ -62,6 +73,14 @@ public:
   // Set the PIDs that are part of this network.
   void set(const std::set<process::UPID>& pids);
 
+  // Returns a future which gets set when the network size satisfies
+  // the constraint specified by 'size' and 'mode'. For example, if
+  // 'size' is 2 and 'mode' is GREATER_THAN, then the returned future
+  // will get set when the size of the network is greater than 2.
+  process::Future<size_t> watch(
+      size_t size,
+      WatchMode mode = NOT_EQUAL_TO) const;
+
   // Sends a request to each member of the network and returns a set
   // of futures that represent their responses.
   template <typename Req, typename Res>
@@ -135,12 +154,18 @@ public:
   {
     link(pid); // Try and keep a socket open (more efficient).
     pids.insert(pid);
+
+    // Update any pending watches.
+    update();
   }
 
   void remove(const process::UPID& pid)
   {
     // TODO(benh): unlink(pid);
     pids.erase(pid);
+
+    // Update any pending watches.
+    update();
   }
 
   void set(const std::set<process::UPID>& _pids)
@@ -149,6 +174,23 @@ public:
     foreach (const process::UPID& pid, _pids) {
       add(pid); // Also does a link.
     }
+
+    // Update any pending watches.
+    update();
+  }
+
+  process::Future<size_t> watch(size_t size, Network::WatchMode mode)
+  {
+    if (satisfied(size, mode)) {
+      return pids.size();
+    }
+
+    Watch* watch = new Watch(size, mode);
+    watches.push_back(watch);
+
+    // TODO(jieyu): Consider deleting 'watch' if the returned future
+    // is discarded by the user.
+    return watch->promise.future();
   }
 
   // Sends a request to each of the groups members and returns a set
@@ -185,12 +227,73 @@ public:
     return Nothing();
   }
 
+protected:
+  virtual void finalize()
+  {
+    foreach (Watch* watch, watches) {
+      watch->promise.fail("Network is being terminated");
+      delete watch;
+    }
+    watches.clear();
+  }
+
 private:
+  struct Watch
+  {
+    Watch(size_t _size, Network::WatchMode _mode)
+      : size(_size), mode(_mode) {}
+
+    size_t size;
+    Network::WatchMode mode;
+    process::Promise<size_t> promise;
+  };
+
   // Not copyable, not assignable.
   NetworkProcess(const NetworkProcess&);
   NetworkProcess& operator = (const NetworkProcess&);
 
+  // Notifies the change of the network.
+  void update()
+  {
+    const size_t size = watches.size();
+    for (size_t i = 0; i < size; i++) {
+      Watch* watch = watches.front();
+      watches.pop_front();
+
+      if (satisfied(watch->size, watch->mode)) {
+        watch->promise.set(pids.size());
+        delete watch;
+      } else {
+        watches.push_back(watch);
+      }
+    }
+  }
+
+  // Returns true if the current size of the network satisfies the
+  // constraint specified by 'size' and 'mode'.
+  bool satisfied(size_t size, Network::WatchMode mode)
+  {
+    switch (mode) {
+      case Network::EQUAL_TO:
+        return pids.size() == size;
+      case Network::NOT_EQUAL_TO:
+        return pids.size() != size;
+      case Network::LESS_THAN:
+        return pids.size() < size;
+      case Network::LESS_THAN_OR_EQUAL_TO:
+        return pids.size() <= size;
+      case Network::GREATER_THAN:
+        return pids.size() > size;
+      case Network::GREATER_THAN_OR_EQUAL_TO:
+        return pids.size() >= size;
+      default:
+        LOG(FATAL) << "Invalid watch mode";
+        break;
+    }
+  }
+
   std::set<process::UPID> pids;
+  std::list<Watch*> watches;
 };
 
 
@@ -234,6 +337,13 @@ inline void Network::set(const std::set<process::UPID>& pids)
 }
 
 
+inline process::Future<size_t> Network::watch(
+    size_t size, Network::WatchMode mode) const
+{
+  return process::dispatch(process, &NetworkProcess::watch, size, mode);
+}
+
+
 template <typename Req, typename Res>
 process::Future<std::set<process::Future<Res> > > Network::broadcast(
     const Protocol<Req, Res>& protocol,

http://git-wip-us.apache.org/repos/asf/mesos/blob/c1e3b741/src/tests/log_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/log_tests.cpp b/src/tests/log_tests.cpp
index f866dde..033e8e5 100644
--- a/src/tests/log_tests.cpp
+++ b/src/tests/log_tests.cpp
@@ -28,6 +28,7 @@
 #include <process/gtest.hpp>
 #include <process/owned.hpp>
 #include <process/pid.hpp>
+#include <process/process.hpp>
 #include <process/protobuf.hpp>
 #include <process/shared.hpp>
 
@@ -68,6 +69,54 @@ using testing::Eq;
 using testing::Return;
 
 
+TEST(NetworkTest, Watch)
+{
+  UPID pid1 = ProcessBase().self();
+  UPID pid2 = ProcessBase().self();
+
+  Network network;
+
+  // Test the default parameter.
+  Future<size_t> future = network.watch(1u);
+  AWAIT_READY(future);
+  EXPECT_EQ(0u, future.get());
+
+  future = network.watch(2u, Network::NOT_EQUAL_TO);
+  AWAIT_READY(future);
+  EXPECT_EQ(0u, future.get());
+
+  future = network.watch(0u, Network::GREATER_THAN_OR_EQUAL_TO);
+  AWAIT_READY(future);
+  EXPECT_EQ(0u, future.get());
+
+  future = network.watch(1u, Network::LESS_THAN);
+  AWAIT_READY(future);
+  EXPECT_EQ(0u, future.get());
+
+  network.add(pid1);
+
+  future = network.watch(1u, Network::EQUAL_TO);
+  AWAIT_READY(future);
+  EXPECT_EQ(1u, future.get());
+
+  future = network.watch(1u, Network::GREATER_THAN);
+  ASSERT_TRUE(future.isPending());
+
+  network.add(pid2);
+
+  AWAIT_READY(future);
+  EXPECT_EQ(2u, future.get());
+
+  future = network.watch(1u, Network::LESS_THAN_OR_EQUAL_TO);
+  ASSERT_TRUE(future.isPending());
+
+  network.remove(pid2);
+
+  AWAIT_READY(future);
+  EXPECT_EQ(1u, future.get());
+}
+
+
 class ReplicaTest : public TemporaryDirectoryTest
 {
 protected: