You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/11/16 02:38:58 UTC

[01/30] mesos git commit: Abstract clock internals from ProcessManager::settle.

Repository: mesos
Updated Branches:
  refs/heads/master acd656c4e -> 8279b45ed


Abstract clock internals from ProcessManager::settle.

Review: https://reviews.apache.org/r/27498


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/84efa184
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/84efa184
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/84efa184

Branch: refs/heads/master
Commit: 84efa184159ebaa7f5110073358d35773d149f0e
Parents: 788a136
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sun Nov 2 15:03:22 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 16:25:57 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/clock.hpp  |  22 +++-
 3rdparty/libprocess/include/process/future.hpp |  22 +++-
 3rdparty/libprocess/src/process.cpp            | 117 ++++++++++++++------
 3 files changed, 121 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/84efa184/3rdparty/libprocess/include/process/clock.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/clock.hpp b/3rdparty/libprocess/include/process/clock.hpp
index 80190ef..d60742a 100644
--- a/3rdparty/libprocess/include/process/clock.hpp
+++ b/3rdparty/libprocess/include/process/clock.hpp
@@ -39,11 +39,27 @@ public:
 
   static void order(ProcessBase* from, ProcessBase* to);
 
-  // When the clock is paused, settle() synchronously ensures that:
+  // When the clock is paused this returns only after
   //   (1) all expired timers are executed,
-  //   (2) no Processes are running, and
-  //   (3) no Processes are ready to run.
+  //   (2) no processes are running, and
+  //   (3) no processes are ready to run.
+  //
+  // In other words, this function blocks synchronously until no other
+  // execution on any processes will occur unless the clock is
+  // advanced.
+  //
+  // TODO(benh): Move this function elsewhere, for instance, to a
+  // top-level function in the 'process' namespace since it deals with
+  // both processes and the clock.
   static void settle();
+
+  // When the clock is paused this returns true if all timers that
+  // expire before the paused time have executed, otherwise false.
+  // Note that if the clock continually gets advanced concurrently
+  // this function may never return true because the "paused" time
+  // will continue to get pushed out farther in the future making more
+  // timers candidates for execution.
+  static bool settled();
 };
 
 } // namespace process {

http://git-wip-us.apache.org/repos/asf/mesos/blob/84efa184/3rdparty/libprocess/include/process/future.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/future.hpp b/3rdparty/libprocess/include/process/future.hpp
index 68e5f7b..2e4f9ef 100644
--- a/3rdparty/libprocess/include/process/future.hpp
+++ b/3rdparty/libprocess/include/process/future.hpp
@@ -1121,7 +1121,7 @@ bool Future<T>::hasDiscard() const
 
 namespace internal {
 
-inline void awaited(const Owned<Latch>& latch)
+inline void awaited(Owned<Latch> latch)
 {
   latch->trigger();
 }
@@ -1132,18 +1132,32 @@ inline void awaited(const Owned<Latch>& latch)
 template <typename T>
 bool Future<T>::await(const Duration& duration) const
 {
-  Owned<Latch> latch;
+  // NOTE: We need to preemptively allocate the Latch on the stack
+  // instead of lazily create it in the critical section below because
+  // instantiating a Latch requires creating a new process (at the
+  // time of writing this comment) which might need to do some
+  // synchronization in libprocess which might deadlock if some other
+  // code in libprocess is already holding a lock and then attempts to
+  // do Promise::set (or something similar) that attempts to acquire
+  // the lock that we acquire here. This is an artifact of using
+  // Future/Promise within the implementation of libprocess.
+  //
+  // We mostly only call 'await' in tests so this should not be a
+  // performance concern.
+  Owned<Latch> latch(new Latch());
+
+  bool pending = false;
 
   internal::acquire(&data->lock);
   {
     if (data->state == PENDING) {
-      latch.reset(new Latch());
+      pending = true;
       data->onAnyCallbacks.push(lambda::bind(&internal::awaited, latch));
     }
   }
   internal::release(&data->lock);
 
-  if (latch.get() != NULL) {
+  if (pending) {
     return latch->await(duration);
   }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/84efa184/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 9ebac08..e7e5520 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -43,6 +43,7 @@
 
 #include <boost/shared_array.hpp>
 
+#include <process/check.hpp>
 #include <process/clock.hpp>
 #include <process/defer.hpp>
 #include <process/delay.hpp>
@@ -475,11 +476,6 @@ static queue<lambda::function<void(void)> >* functions =
 static map<Time, list<Timer> >* timeouts = new map<Time, list<Timer> >();
 static synchronizable(timeouts) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
 
-// For supporting Clock::settle(), true if timers have been removed
-// from 'timeouts' but may not have been executed yet. Protected by
-// the timeouts lock. This is only used when the clock is paused.
-static bool pending_timers = false;
-
 // Flag to indicate whether or to update the timer on async interrupt.
 static bool update_timer = false;
 
@@ -525,6 +521,11 @@ Duration advanced = Duration::zero();
 
 bool paused = false;
 
+// For supporting Clock::settled(), false if we're not currently
+// settling (or we're not paused), true if we're currently attempting
+// to settle (and we're paused).
+bool settling = false;
+
 } // namespace clock {
 
 
@@ -596,6 +597,7 @@ void Clock::resume()
     if (clock::paused) {
       VLOG(2) << "Clock resumed at " << clock::current;
       clock::paused = false;
+      clock::settling = false;
       clock::currents->clear();
       update_timer = true;
       ev_async_send(loop, &async_watcher);
@@ -667,6 +669,7 @@ void Clock::update(ProcessBase* process, const Time& time)
 
 void Clock::order(ProcessBase* from, ProcessBase* to)
 {
+  VLOG(2) << "Clock of " << to->self() << " being updated to " << from->self();
   update(to, now(from));
 }
 
@@ -678,6 +681,29 @@ void Clock::settle()
 }
 
 
+bool Clock::settled()
+{
+  synchronized (timeouts) {
+    CHECK(clock::paused);
+
+    if (update_timer) {
+      return false;
+    } else if (clock::settling) {
+      VLOG(3) << "Clock still not settled";
+      return false;
+    } else if (timeouts->size() == 0 ||
+               timeouts->begin()->first > clock::current) {
+      VLOG(3) << "Clock is settled";
+      return true;
+    }
+
+    VLOG(3) << "Clock is not settled";
+
+    return false;
+  }
+}
+
+
 Try<Time> Time::create(double seconds)
 {
   Try<Duration> duration = Duration::create(seconds);
@@ -878,9 +904,12 @@ void handle_timeouts(struct ev_loop* loop, ev_timer* _, int revents)
 
       VLOG(3) << "Have timeout(s) at " << timeout;
 
-      // Record that we have pending timers to execute so the
-      // Clock::settle() operation can wait until we're done.
-      pending_timers = true;
+      // Need to toggle 'settling' so that we don't prematurely say
+      // we're settled until after the timers are executed below,
+      // outside of the critical section.
+      if (clock::paused) {
+        clock::settling = true;
+      }
 
       foreach (const Timer& timer, (*timeouts)[timeout]) {
         timedout.push_back(timer);
@@ -943,11 +972,16 @@ void handle_timeouts(struct ev_loop* loop, ev_timer* _, int revents)
     timer();
   }
 
-  // Mark ourselves as done executing the timers since it's now safe
-  // for a call to Clock::settle() to check if there will be any
-  // future timeouts reached.
+  // Mark 'settling' as false since there are not any more timeouts
+  // that will expire before the paused time and we've finished
+  // executing expired timers.
   synchronized (timeouts) {
-    pending_timers = false;
+    if (clock::paused &&
+        (timeouts->size() == 0 ||
+         timeouts->begin()->first > clock::current)) {
+      VLOG(3) << "Clock has settled";
+      clock::settling = false;
+    }
   }
 }
 
@@ -2961,7 +2995,15 @@ bool ProcessManager::wait(const UPID& pid)
           list<ProcessBase*>::iterator it =
             find(runq.begin(), runq.end(), process);
           if (it != runq.end()) {
+            // Found it! Remove it from the run queue since we'll be
+            // donating our thread and also increment 'running' before
+            // leaving this 'runq' protected critical section so that
+            // everyone that is waiting for the processes to settle
+            // continue to wait (otherwise they could see nothing in
+            // 'runq' and 'running' equal to 0 between when we exit
+            // this critical section and increment 'running').
             runq.erase(it);
+            __sync_fetch_and_add(&running, 1);
           } else {
             // Another thread has resumed the process ...
             process = NULL;
@@ -2977,7 +3019,6 @@ bool ProcessManager::wait(const UPID& pid)
   if (process != NULL) {
     VLOG(2) << "Donating thread to " << process->pid << " while waiting";
     ProcessBase* donator = __process__;
-    __sync_fetch_and_add(&running, 1);
     process_manager->resume(process);
     __process__ = donator;
   }
@@ -3046,30 +3087,39 @@ void ProcessManager::settle()
 {
   bool done = true;
   do {
+    // While refactoring in order to isolate libev behind abstractions
+    // it became evident that this os::sleep is vital for tests to
+    // pass. In particular, there are certain tests that assume too
+    // much before they attempt to do a settle. One such example is
+    // tests doing http::get followed by Clock::settle, where they
+    // expect the http::get will have properly enqueued a process on
+    // the run queue but http::get is just sending bytes on a
+    // socket. Without sleeping at the beginning of this function we
+    // can get unlucky and appear settled when in actuallity the
+    // kernel just hasn't copied the bytes to a socket or we haven't
+    // yet read the bytes and enqueued an event on a process (and the
+    // process on the run queue).
     os::sleep(Milliseconds(10));
-    done = true;
-    // Hopefully this is the only place we acquire both these locks.
-    synchronized (runq) {
-      synchronized (timeouts) {
-        CHECK(Clock::paused()); // Since another thread could resume the clock!
 
-        if (!runq.empty()) {
-          done = false;
-        }
+    done = true; // Assume to start that we are settled.
 
-        __sync_synchronize(); // Read barrier for 'running'.
-        if (running > 0) {
-          done = false;
-        }
+    synchronized (runq) {
+      if (!runq.empty()) {
+        done = false;
+        continue;
+      }
 
-        if (timeouts->size() > 0 &&
-            timeouts->begin()->first <= clock::current) {
-          done = false;
-        }
+      // Read barrier for 'running'.
+      __sync_synchronize();
 
-        if (pending_timers) {
-          done = false;
-        }
+      if (running > 0) {
+        done = false;
+        continue;
+      }
+
+      if (!Clock::settled()) {
+        done = false;
+        continue;
       }
     }
   } while (!done);
@@ -3173,7 +3223,8 @@ Timer Clock::timer(
 
   Timer timer(__sync_fetch_and_add(&id, 1), timeout, pid, thunk);
 
-  VLOG(3) << "Created a timer for " << timeout.time();
+  VLOG(3) << "Created a timer for " << pid << " in " << stringify(duration)
+          << " in the future (" << timeout.time() << ")";
 
   // Add the timer.
   synchronized (timeouts) {


[02/30] mesos git commit: Updates to Mesos for Timer::create/cancel -> Clock::timer/cancel.

Posted by be...@apache.org.
Updates to Mesos for Timer::create/cancel -> Clock::timer/cancel.

Review: https://reviews.apache.org/r/27497


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/788a136f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/788a136f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/788a136f

Branch: refs/heads/master
Commit: 788a136f3a43a32ce64f7b2ef35a1005f2528dbd
Parents: 303ee1c
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sun Nov 2 20:23:01 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 16:25:57 2014 -0800

----------------------------------------------------------------------
 src/launcher/executor.cpp | 2 +-
 src/log/catchup.cpp       | 2 +-
 src/master/master.cpp     | 4 ++--
 src/slave/gc.cpp          | 2 +-
 src/slave/slave.cpp       | 6 +++---
 src/zookeeper/group.cpp   | 4 ++--
 6 files changed, 10 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/788a136f/src/launcher/executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/executor.cpp b/src/launcher/executor.cpp
index cbc8750..6175bf5 100644
--- a/src/launcher/executor.cpp
+++ b/src/launcher/executor.cpp
@@ -378,7 +378,7 @@ private:
     TaskState state;
     string message;
 
-    Timer::cancel(escalationTimer);
+    Clock::cancel(escalationTimer);
 
     if (!status_.isReady()) {
       state = TASK_FAILED;

http://git-wip-us.apache.org/repos/asf/mesos/blob/788a136f/src/log/catchup.cpp
----------------------------------------------------------------------
diff --git a/src/log/catchup.cpp b/src/log/catchup.cpp
index 6315a85..f7afc38 100644
--- a/src/log/catchup.cpp
+++ b/src/log/catchup.cpp
@@ -238,7 +238,7 @@ private:
       .onFailed(defer(self(), &Self::failed))
       .onReady(defer(self(), &Self::succeeded));
 
-    Timer::create(timeout, lambda::bind(&Self::timedout, catching));
+    Clock::timer(timeout, lambda::bind(&Self::timedout, catching));
   }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/788a136f/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 83c2f8a..b5fa8d1 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -773,7 +773,7 @@ void Master::finalize()
   // TODO(vinod): This seems to be a bug in libprocess or the
   // testing infrastructure.
   if (slaves.recoveredTimer.isSome()) {
-    Timer::cancel(slaves.recoveredTimer.get());
+    Clock::cancel(slaves.recoveredTimer.get());
   }
 
   terminate(whitelistWatcher);
@@ -4829,7 +4829,7 @@ void Master::removeOffer(Offer* offer, bool rescind)
   // Remove and cancel offer removal timers. Canceling the Timers is
   // only done to avoid having too many active Timers in libprocess.
   if (offerTimers.contains(offer->id())) {
-    Timer::cancel(offerTimers[offer->id()]);
+    Clock::cancel(offerTimers[offer->id()]);
     offerTimers.erase(offer->id());
   }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/788a136f/src/slave/gc.cpp
----------------------------------------------------------------------
diff --git a/src/slave/gc.cpp b/src/slave/gc.cpp
index 73136dd..6042277 100644
--- a/src/slave/gc.cpp
+++ b/src/slave/gc.cpp
@@ -113,7 +113,7 @@ bool GarbageCollectorProcess::unschedule(const string& path)
 // existing timer.
 void GarbageCollectorProcess::reset()
 {
-  Timer::cancel(timer); // Cancel the existing timer, if any.
+  Clock::cancel(timer); // Cancel the existing timer, if any.
   if (!paths.empty()) {
     Timeout removalTime = (*paths.begin()).first; // Get the first entry.
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/788a136f/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 275081c..06b2e18 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -795,7 +795,7 @@ void Slave::registered(const UPID& from, const SlaveID& slaveId)
       // If we don't get a ping from the master, trigger a
       // re-registration. This needs to be done once registered,
       // in case we never receive an initial ping.
-      Timer::cancel(pingTimer);
+      Clock::cancel(pingTimer);
 
       pingTimer = delay(
           MASTER_PING_TIMEOUT(),
@@ -2577,7 +2577,7 @@ void Slave::pingOld(const UPID& from, const string& body)
   // longer considers the slave to be registered, so it is
   // essential for the slave to attempt a re-registration
   // when this occurs.
-  Timer::cancel(pingTimer);
+  Clock::cancel(pingTimer);
 
   pingTimer = delay(
       MASTER_PING_TIMEOUT(),
@@ -2609,7 +2609,7 @@ void Slave::ping(const UPID& from, bool connected)
   // longer considers the slave to be registered, so it is
   // essential for the slave to attempt a re-registration
   // when this occurs.
-  Timer::cancel(pingTimer);
+  Clock::cancel(pingTimer);
 
   pingTimer = delay(
       MASTER_PING_TIMEOUT(),

http://git-wip-us.apache.org/repos/asf/mesos/blob/788a136f/src/zookeeper/group.cpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/group.cpp b/src/zookeeper/group.cpp
index 7dee0a1..173caa8 100644
--- a/src/zookeeper/group.cpp
+++ b/src/zookeeper/group.cpp
@@ -332,7 +332,7 @@ void GroupProcess::connected(int64_t sessionId, bool reconnect)
 
   // Cancel and cleanup the reconnect timer (if necessary).
   if (timer.isSome()) {
-    Timer::cancel(timer.get());
+    Clock::cancel(timer.get());
     timer = None();
   }
 
@@ -476,7 +476,7 @@ void GroupProcess::expired(int64_t sessionId)
 
   // Cancel and cleanup the reconnect timer (if necessary).
   if (timer.isSome()) {
-    Timer::cancel(timer.get());
+    Clock::cancel(timer.get());
     timer = None();
   }
 


[11/30] mesos git commit: Started moving libev specific functionality out of process.cpp.

Posted by be...@apache.org.
Started moving libev specific functionality out of process.cpp.

Review: https://reviews.apache.org/r/27504


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f78ae663
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f78ae663
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f78ae663

Branch: refs/heads/master
Commit: f78ae6635ce0aae89b1b9bd3dd1eae02c1da0936
Parents: 0e19796
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sun Nov 2 18:19:23 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 16:25:58 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/Makefile.am     |  2 +
 3rdparty/libprocess/src/libev.cpp   | 21 ++++++++++
 3rdparty/libprocess/src/libev.hpp   | 70 ++++++++++++++++++++++++++++++++
 3rdparty/libprocess/src/process.cpp | 64 ++++-------------------------
 4 files changed, 100 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/f78ae663/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index 0008e68..2de9989 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -38,6 +38,8 @@ libprocess_la_SOURCES =		\
   src/help.cpp			\
   src/http.cpp			\
   src/latch.cpp			\
+  src/libev.hpp			\
+  src/libev.cpp			\
   src/metrics/metrics.cpp	\
   src/pid.cpp			\
   src/process.cpp		\

http://git-wip-us.apache.org/repos/asf/mesos/blob/f78ae663/3rdparty/libprocess/src/libev.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libev.cpp b/3rdparty/libprocess/src/libev.cpp
new file mode 100644
index 0000000..efc89d8
--- /dev/null
+++ b/3rdparty/libprocess/src/libev.cpp
@@ -0,0 +1,21 @@
+#include <ev.h>
+
+#include <queue>
+
+#include <stout/lambda.hpp>
+
+#include "libev.hpp"
+
+namespace process {
+
+// Defines the initial values for all of the declarations made in
+// libev.hpp (since these need to live in the static data space).
+struct ev_loop* loop = NULL;
+ev_async async_watcher;
+ev_io server_watcher;
+std::queue<ev_io*>* watchers = new std::queue<ev_io*>();
+synchronizable(watchers);
+std::queue<lambda::function<void(void)>>* functions =
+  new std::queue<lambda::function<void(void)>>();
+
+} // namespace process {

http://git-wip-us.apache.org/repos/asf/mesos/blob/f78ae663/3rdparty/libprocess/src/libev.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libev.hpp b/3rdparty/libprocess/src/libev.hpp
new file mode 100644
index 0000000..bac8b6a
--- /dev/null
+++ b/3rdparty/libprocess/src/libev.hpp
@@ -0,0 +1,70 @@
+#include <ev.h>
+
+#include <queue>
+
+#include <process/future.hpp>
+#include <process/owned.hpp>
+
+#include <stout/lambda.hpp>
+
+#include "synchronized.hpp"
+
+namespace process {
+
+// Event loop.
+extern struct ev_loop* loop;
+
+// Asynchronous watcher for interrupting loop to specifically deal
+// with IO watchers and functions (via run_in_event_loop).
+extern ev_async async_watcher;
+
+// Server watcher for accepting connections.
+extern ev_io server_watcher;
+
+// Queue of I/O watchers to be asynchronously added to the event loop
+// (protected by 'watchers' below).
+// TODO(benh): Replace this queue with functions that we put in
+// 'functions' below that perform the ev_io_start themselves.
+extern std::queue<ev_io*>* watchers;
+extern synchronizable(watchers);
+
+// Queue of functions to be invoked asynchronously within the vent
+// loop (protected by 'watchers' above).
+extern std::queue<lambda::function<void(void)>>* functions;
+
+
+// Wrapper around function we want to run in the event loop.
+template <typename T>
+void _run_in_event_loop(
+    const lambda::function<Future<T>(void)>& f,
+    const Owned<Promise<T>>& promise)
+{
+  // Don't bother running the function if the future has been discarded.
+  if (promise->future().hasDiscard()) {
+    promise->discard();
+  } else {
+    promise->set(f());
+  }
+}
+
+
+// Helper for running a function in the event loop.
+template <typename T>
+Future<T> run_in_event_loop(const lambda::function<Future<T>(void)>& f)
+{
+  Owned<Promise<T>> promise(new Promise<T>());
+
+  Future<T> future = promise->future();
+
+  // Enqueue the function.
+  synchronized (watchers) {
+    functions->push(lambda::bind(&_run_in_event_loop<T>, f, promise));
+  }
+
+  // Interrupt the loop.
+  ev_async_send(loop, &async_watcher);
+
+  return future;
+}
+
+} // namespace process {

http://git-wip-us.apache.org/repos/asf/mesos/blob/f78ae663/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index bac4200..d96d881 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -83,6 +83,7 @@
 #include "decoder.hpp"
 #include "encoder.hpp"
 #include "gate.hpp"
+#include "libev.hpp"
 #include "process_reference.hpp"
 #include "synchronized.hpp"
 
@@ -446,28 +447,6 @@ static SocketManager* socket_manager = NULL;
 // Active ProcessManager (eventually will probably be thread-local).
 static ProcessManager* process_manager = NULL;
 
-// Event loop.
-struct ev_loop* loop = NULL;
-
-// Asynchronous watcher for interrupting loop to specifically deal
-// with IO watchers and functions (via run_in_event_loop).
-static ev_async async_watcher;
-
-// Server watcher for accepting connections.
-static ev_io server_watcher;
-
-// Queue of I/O watchers to be asynchronously added to the event loop
-// (protected by 'watchers' below).
-// TODO(benh): Replace this queue with functions that we put in
-// 'functions' below that perform the ev_io_start themselves.
-static queue<ev_io*>* watchers = new queue<ev_io*>();
-static synchronizable(watchers) = SYNCHRONIZED_INITIALIZER;
-
-// Queue of functions to be invoked asynchronously within the vent
-// loop (protected by 'watchers' below).
-static queue<lambda::function<void(void)> >* functions =
-  new queue<lambda::function<void(void)> >();
-
 // Scheduling gate that threads wait at when there is nothing to run.
 static Gate* gate = new Gate();
 
@@ -593,40 +572,6 @@ static Message* parse(Request* request)
   return message;
 }
 
-// Wrapper around function we want to run in the event loop.
-template <typename T>
-void _run_in_event_loop(
-    const lambda::function<Future<T>(void)>& f,
-    const Owned<Promise<T> >& promise)
-{
-  // Don't bother running the function if the future has been discarded.
-  if (promise->future().hasDiscard()) {
-    promise->discard();
-  } else {
-    promise->set(f());
-  }
-}
-
-
-// Helper for running a function in the event loop.
-template <typename T>
-Future<T> run_in_event_loop(const lambda::function<Future<T>(void)>& f)
-{
-  Owned<Promise<T> > promise(new Promise<T>());
-
-  Future<T> future = promise->future();
-
-  // Enqueue the function.
-  synchronized (watchers) {
-    functions->push(lambda::bind(&_run_in_event_loop<T>, f, promise));
-  }
-
-  // Interrupt the loop.
-  ev_async_send(loop, &async_watcher);
-
-  return future;
-}
-
 
 void handle_async(struct ev_loop* loop, ev_async* _, int revents)
 {
@@ -1280,7 +1225,12 @@ void initialize(const string& delegate)
     PLOG(FATAL) << "Failed to initialize, listen";
   }
 
-  // Setup event loop.
+  // Initialize libev.
+  //
+  // TODO(benh): Eventually move this all out of process.cpp after
+  // more is disentangled.
+  synchronizer(watchers) = SYNCHRONIZED_INITIALIZER;
+
 #ifdef __sun__
   loop = ev_default_loop(EVBACKEND_POLL | EVBACKEND_SELECT);
 #else


[29/30] mesos git commit: Virtualize backup(length) and remaining() in Encoder interface.

Posted by be...@apache.org.
Virtualize backup(length) and remaining() in Encoder interface.

These are top level concepts implemented by both data and file encoders.

Review: https://reviews.apache.org/r/27963


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3564c446
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3564c446
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3564c446

Branch: refs/heads/master
Commit: 3564c44642cda195c60e8f4914c59b49dbdff576
Parents: 4a79a69
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Nov 15 16:48:15 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 17:38:22 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/src/encoder.hpp | 4 ++++
 1 file changed, 4 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/3564c446/3rdparty/libprocess/src/encoder.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/encoder.hpp b/3rdparty/libprocess/src/encoder.hpp
index 8040d23..7afde0e 100644
--- a/3rdparty/libprocess/src/encoder.hpp
+++ b/3rdparty/libprocess/src/encoder.hpp
@@ -49,6 +49,10 @@ public:
 
   virtual Kind kind() const = 0;
 
+  virtual void backup(size_t length) = 0;
+
+  virtual size_t remaining() const = 0;
+
   Socket socket() const
   {
     return s;


[28/30] mesos git commit: Introduce Kind enumeration in Encoder.

Posted by be...@apache.org.
Introduce Kind enumeration in Encoder.

Introduce enumeration in Encoder to distinguish between data and file
encoders rather than using a function pointer.

Review: https://reviews.apache.org/r/27962


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4a79a690
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4a79a690
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4a79a690

Branch: refs/heads/master
Commit: 4a79a690d02f0910beba4125f0d0c4f0c2f01f46
Parents: 541e8e5
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Nov 15 16:47:49 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 17:38:22 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/src/encoder.hpp | 17 +++++++++++++++++
 1 file changed, 17 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/4a79a690/3rdparty/libprocess/src/encoder.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/encoder.hpp b/3rdparty/libprocess/src/encoder.hpp
index 800f324..8040d23 100644
--- a/3rdparty/libprocess/src/encoder.hpp
+++ b/3rdparty/libprocess/src/encoder.hpp
@@ -37,11 +37,18 @@ typedef void (*Sender)(Encoder*);
 class Encoder
 {
 public:
+  enum Kind {
+    DATA,
+    FILE
+  };
+
   explicit Encoder(const Socket& _s) : s(_s) {}
   virtual ~Encoder() {}
 
   virtual Sender sender() = 0;
 
+  virtual Kind kind() const = 0;
+
   Socket socket() const
   {
     return s;
@@ -65,6 +72,11 @@ public:
     return send_data;
   }
 
+  virtual Kind kind() const
+  {
+    return Encoder::DATA;
+  }
+
   virtual const char* next(size_t* length)
   {
     size_t temp = index;
@@ -243,6 +255,11 @@ public:
     return send_file;
   }
 
+  virtual Kind kind() const
+  {
+    return Encoder::FILE;
+  }
+
   virtual int next(off_t* offset, size_t* length)
   {
     off_t temp = index;


[19/30] mesos git commit: Use Socket::read for ignore_data.

Posted by be...@apache.org.
Use Socket::read for ignore_data.

Review: https://reviews.apache.org/r/27961


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/541e8e5b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/541e8e5b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/541e8e5b

Branch: refs/heads/master
Commit: 541e8e5b2b8353c62fbe42d6e63ca15809b2fdb7
Parents: 383f547
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Nov 15 16:47:18 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 17:38:21 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/src/process.cpp | 89 +++++++++++++++-----------------
 1 file changed, 43 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/541e8e5b/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index c37d522..63f9df4 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -589,48 +589,6 @@ void handle_async(struct ev_loop* loop, ev_async* _, int revents)
 }
 
 
-// A variant of 'recv_data' that doesn't do anything with the
-// data. Used by sockets created via SocketManager::link as well as
-// SocketManager::send(Message) where we don't care about the data
-// received we mostly just want to know when the socket has been
-// closed.
-void ignore_data(Socket* socket, int s)
-{
-  while (true) {
-    const ssize_t size = 80 * 1024;
-    ssize_t length = 0;
-
-    char data[size];
-
-    length = recv(s, data, size, 0);
-
-    if (length < 0 && (errno == EINTR)) {
-      // Interrupted, try again now.
-      continue;
-    } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
-      // Might block, try again later.
-      io::poll(s, io::READ)
-        .onAny(lambda::bind(&ignore_data, socket, s));
-      break;
-    } else if (length <= 0) {
-      // Socket error or closed.
-      if (length < 0) {
-        const char* error = strerror(errno);
-        VLOG(1) << "Socket error while receiving: " << error;
-      } else {
-        VLOG(2) << "Socket closed while receiving";
-      }
-      socket_manager->close(s);
-      delete socket;
-      break;
-    } else {
-      VLOG(2) << "Ignoring " << length << " bytes of data received "
-              << "on socket used only for sending";
-    }
-  }
-}
-
-
 void send_data(Encoder* e)
 {
   DataEncoder* encoder = CHECK_NOTNULL(dynamic_cast<DataEncoder*>(e));
@@ -1528,6 +1486,31 @@ Socket SocketManager::accepted(int s)
 
 namespace internal {
 
+void ignore_read_data(
+    const Future<size_t>& length,
+    Socket* socket,
+    char* data,
+    size_t size)
+{
+  if (length.isDiscarded() || length.isFailed()) {
+    socket_manager->close(*socket);
+    delete[] data;
+    delete socket;
+    return;
+  }
+
+  if (length.get() == 0) {
+    socket_manager->close(*socket);
+    delete[] data;
+    delete socket;
+    return;
+  }
+
+  socket->read(data, size)
+    .onAny(lambda::bind(&ignore_read_data, lambda::_1, socket, data, size));
+}
+
+
 void link_connect(const Future<Socket>& socket)
 {
   if (socket.isDiscarded() || socket.isFailed()) {
@@ -1538,8 +1521,15 @@ void link_connect(const Future<Socket>& socket)
     return;
   }
 
-  io::poll(socket.get(), io::READ)
-    .onAny(lambda::bind(&ignore_data, new Socket(socket.get()), socket.get()));
+  size_t size = 80 * 1024;
+  char* data = new char[size];
+  socket.get().read(data, size)
+    .onAny(lambda::bind(
+        &ignore_read_data,
+        lambda::_1,
+        new Socket(socket.get()),
+        data,
+        size));
 }
 
 } // namespace internal {
@@ -1677,8 +1667,15 @@ void send_connect(const Future<Socket>& socket, Message* message)
   // Read and ignore data from this socket. Note that we don't
   // expect to receive anything other than HTTP '202 Accepted'
   // responses which we just ignore.
-  io::poll(socket.get(), io::READ)
-    .onAny(lambda::bind(&ignore_data, new Socket(socket.get()), socket.get()));
+  size_t size = 80 * 1024;
+  char* data = new char[size];
+  socket.get().read(data, size)
+    .onAny(lambda::bind(
+        &ignore_read_data,
+        lambda::_1,
+        new Socket(socket.get()),
+        data,
+        size));
 
   // Start polling in order to send data.
   io::poll(socket.get(), io::WRITE)


[07/30] mesos git commit: Removed redundant synchronization and conditional check.

Posted by be...@apache.org.
Removed redundant synchronization and conditional check.

Review: https://reviews.apache.org/r/27500


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6df6d02c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6df6d02c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6df6d02c

Branch: refs/heads/master
Commit: 6df6d02c2669575037fb0fb5bbbb4d6f1ccdccc1
Parents: b1f2bb9
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sun Nov 2 15:57:21 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 16:25:58 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/src/process.cpp | 6 +-----
 1 file changed, 1 insertion(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/6df6d02c/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 9551d99..0995a9c 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -2444,11 +2444,7 @@ void SocketManager::exited(ProcessBase* process)
       if (linkee == pid) {
         foreach (ProcessBase* linker, processes) {
           CHECK(linker != process) << "Process linked with itself";
-          synchronized (timeouts) {
-            if (Clock::paused()) {
-              Clock::update(linker, time);
-            }
-          }
+          Clock::update(linker, time);
           linker->enqueue(new ExitedEvent(linkee));
         }
       }


[08/30] mesos git commit: Introduced a callback for timer expiration.

Posted by be...@apache.org.
Introduced a callback for timer expiration.

This enables seperating the Clock specific functionality from the
ProcessManager functionality so that the Clock implementation can
ultimately be completely isolated.

Review: https://reviews.apache.org/r/27499


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b1f2bb9b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b1f2bb9b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b1f2bb9b

Branch: refs/heads/master
Commit: b1f2bb9b7eabf74be19bcd642e94842a3a476f30
Parents: 84efa18
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sun Nov 2 15:50:36 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 16:25:58 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/clock.hpp | 10 ++++
 3rdparty/libprocess/src/process.cpp           | 63 +++++++++++++---------
 2 files changed, 48 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/b1f2bb9b/3rdparty/libprocess/include/process/clock.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/clock.hpp b/3rdparty/libprocess/include/process/clock.hpp
index d60742a..ae7d0fb 100644
--- a/3rdparty/libprocess/include/process/clock.hpp
+++ b/3rdparty/libprocess/include/process/clock.hpp
@@ -1,6 +1,8 @@
 #ifndef __PROCESS_CLOCK_HPP__
 #define __PROCESS_CLOCK_HPP__
 
+#include <list>
+
 #include <process/time.hpp>
 #include <process/timer.hpp>
 
@@ -17,6 +19,14 @@ class Timer;
 class Clock
 {
 public:
+  // Initialize the clock with the specified callback that will be
+  // invoked whenever a batch of timers has expired.
+  //
+  // TODO(benh): Introduce a "channel" or listener pattern for getting
+  // the expired Timers rather than passing in a callback. This might
+  // mean we don't need 'initialize' or 'shutdown'.
+  static void initialize(lambda::function<void(std::list<Timer>&&)>&& callback);
+
   static Time now();
   static Time now(ProcessBase* process);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/b1f2bb9b/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index e7e5520..9551d99 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -526,9 +526,18 @@ bool paused = false;
 // to settle (and we're paused).
 bool settling = false;
 
+// Lambda function to invoke when timers have expired.
+lambda::function<void(list<Timer>&&)> callback;
+
 } // namespace clock {
 
 
+void Clock::initialize(lambda::function<void(list<Timer>&&)>&& callback)
+{
+  clock::callback = callback;
+}
+
+
 Time Clock::now()
 {
   return now(__process__);
@@ -890,7 +899,7 @@ void handle_async(struct ev_loop* loop, ev_async* _, int revents)
 
 void handle_timeouts(struct ev_loop* loop, ev_timer* _, int revents)
 {
-  list<Timer> timedout;
+  list<Timer> timers;
 
   synchronized (timeouts) {
     Time now = Clock::now();
@@ -912,7 +921,7 @@ void handle_timeouts(struct ev_loop* loop, ev_timer* _, int revents)
       }
 
       foreach (const Timer& timer, (*timeouts)[timeout]) {
-        timedout.push_back(timer);
+        timers.push_back(timer);
       }
     }
 
@@ -948,29 +957,7 @@ void handle_timeouts(struct ev_loop* loop, ev_timer* _, int revents)
     update_timer = false; // Since we might have a queued update_timer.
   }
 
-  // Update current time of process (if it's present/valid). It might
-  // be necessary to actually add some more synchronization around
-  // this so that, for example, pausing and resuming the clock doesn't
-  // cause some processes to get thier current times updated and
-  // others not. Since ProcessManager::use acquires the 'processes'
-  // lock we had to move this out of the synchronized (timeouts) above
-  // since there was a deadlock with acquring 'processes' then
-  // 'timeouts' (reverse order) in ProcessManager::cleanup. Note that
-  // current time may be greater than the timeout if a local message
-  // was received (and happens-before kicks in).
-  if (Clock::paused()) {
-    foreach (const Timer& timer, timedout) {
-      if (ProcessReference process = process_manager->use(timer.creator())) {
-        Clock::update(process, timer.timeout().time());
-      }
-    }
-  }
-
-  // Invoke the timers that timed out (TODO(benh): Do this
-  // asynchronously so that we don't tie up the event thread!).
-  foreach (const Timer& timer, timedout) {
-    timer();
-  }
+  clock::callback(std::move(timers));
 
   // Mark 'settling' as false since there are not any more timeouts
   // that will expire before the paused time and we've finished
@@ -1412,6 +1399,27 @@ void* schedule(void* arg)
 }
 
 
+void timedout(list<Timer>&& timers)
+{
+  // Update current time of process (if it's present/valid). Note that
+  // current time may be greater than the timeout if a local message
+  // was received (and happens-before kicks in).
+  if (Clock::paused()) {
+    foreach (const Timer& timer, timers) {
+      if (ProcessReference process = process_manager->use(timer.creator())) {
+        Clock::update(process, timer.timeout().time());
+      }
+    }
+  }
+
+  // Invoke the timers that timed out (TODO(benh): Do this
+  // asynchronously so that we don't tie up the event thread!).
+  foreach (const Timer& timer, timers) {
+    timer();
+  }
+}
+
+
 // We might find value in catching terminating signals at some point.
 // However, for now, adding signal handlers freely is not allowed
 // because they will clash with Java and Python virtual machines and
@@ -1484,6 +1492,8 @@ void initialize(const string& delegate)
   process_manager = new ProcessManager(delegate);
   socket_manager = new SocketManager();
 
+  Clock::initialize(lambda::bind(&timedout, lambda::_1));
+
   // Setup processing threads.
   // We create no fewer than 8 threads because some tests require
   // more worker threads than 'sysconf(_SC_NPROCESSORS_ONLN)' on
@@ -1701,6 +1711,9 @@ void initialize(const string& delegate)
 void finalize()
 {
   delete process_manager;
+
+  // TODO(benh): Finialize/shutdown Clock so that it doesn't attempt
+  // to dereference 'process_manager' in the 'timedout' callback.
 }
 
 


[13/30] mesos git commit: Moved event loop specific polling into poll.cpp.

Posted by be...@apache.org.
Moved event loop specific polling into poll.cpp.

Review: https://reviews.apache.org/r/27505


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/413ce94f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/413ce94f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/413ce94f

Branch: refs/heads/master
Commit: 413ce94f8d74b8c29657eef7dbc2f6ade4143bc7
Parents: f78ae66
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sun Nov 2 18:22:15 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 16:25:58 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/Makefile.am     |   1 +
 3rdparty/libprocess/src/poll.cpp    | 129 +++++++++++++++++++++++++++++++
 3rdparty/libprocess/src/process.cpp | 112 ---------------------------
 3 files changed, 130 insertions(+), 112 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/413ce94f/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index 2de9989..41c3bd1 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -42,6 +42,7 @@ libprocess_la_SOURCES =		\
   src/libev.cpp			\
   src/metrics/metrics.cpp	\
   src/pid.cpp			\
+  src/poll.cpp			\
   src/process.cpp		\
   src/process_reference.hpp	\
   src/reap.cpp			\

http://git-wip-us.apache.org/repos/asf/mesos/blob/413ce94f/3rdparty/libprocess/src/poll.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/poll.cpp b/3rdparty/libprocess/src/poll.cpp
new file mode 100644
index 0000000..324e4dd
--- /dev/null
+++ b/3rdparty/libprocess/src/poll.cpp
@@ -0,0 +1,129 @@
+#include <ev.h>
+
+#include <process/future.hpp>
+#include <process/process.hpp> // For process::initialize.
+
+#include <stout/lambda.hpp>
+#include <stout/memory.hpp>
+
+#include "libev.hpp"
+
+namespace process {
+
+// Data necessary for polling so we can discard polling and actually
+// stop it in the event loop.
+struct Poll
+{
+  Poll()
+  {
+    // Need to explicitly instantiate the watchers.
+    watcher.io.reset(new ev_io());
+    watcher.async.reset(new ev_async());
+  }
+
+  // An I/O watcher for checking for readability or writeability and
+  // an async watcher for being able to discard the polling.
+  struct {
+    memory::shared_ptr<ev_io> io;
+    memory::shared_ptr<ev_async> async;
+  } watcher;
+
+  Promise<short> promise;
+};
+
+
+// Event loop callback when I/O is ready on polling file descriptor.
+void polled(struct ev_loop* loop, ev_io* watcher, int revents)
+{
+  Poll* poll = (Poll*) watcher->data;
+
+  ev_io_stop(loop, poll->watcher.io.get());
+
+  // Stop the async watcher (also clears if pending so 'discard_poll'
+  // will not get invoked and we can delete 'poll' here).
+  ev_async_stop(loop, poll->watcher.async.get());
+
+  poll->promise.set(revents);
+
+  delete poll;
+}
+
+
+// Event loop callback when future associated with polling file
+// descriptor has been discarded.
+void discard_poll(struct ev_loop* loop, ev_async* watcher, int revents)
+{
+  Poll* poll = (Poll*) watcher->data;
+
+  // Check and see if we have a pending 'polled' callback and if so
+  // let it "win".
+  if (ev_is_pending(poll->watcher.io.get())) {
+    return;
+  }
+
+  ev_async_stop(loop, poll->watcher.async.get());
+
+  // Stop the I/O watcher (but note we check if pending above) so it
+  // won't get invoked and we can delete 'poll' here.
+  ev_io_stop(loop, poll->watcher.io.get());
+
+  poll->promise.discard();
+
+  delete poll;
+}
+
+
+namespace io {
+namespace internal {
+
+// Helper/continuation of 'poll' on future discard.
+void _poll(const memory::shared_ptr<ev_async>& async)
+{
+  ev_async_send(loop, async.get());
+}
+
+
+Future<short> poll(int fd, short events)
+{
+  Poll* poll = new Poll();
+
+  // Have the watchers data point back to the struct.
+  poll->watcher.async->data = poll;
+  poll->watcher.io->data = poll;
+
+  // Get a copy of the future to avoid any races with the event loop.
+  Future<short> future = poll->promise.future();
+
+  // Initialize and start the async watcher.
+  ev_async_init(poll->watcher.async.get(), discard_poll);
+  ev_async_start(loop, poll->watcher.async.get());
+
+  // Make sure we stop polling if a discard occurs on our future.
+  // Note that it's possible that we'll invoke '_poll' when someone
+  // does a discard even after the polling has already completed, but
+  // in this case while we will interrupt the event loop since the
+  // async watcher has already been stopped we won't cause
+  // 'discard_poll' to get invoked.
+  future.onDiscard(lambda::bind(&_poll, poll->watcher.async));
+
+  // Initialize and start the I/O watcher.
+  ev_io_init(poll->watcher.io.get(), polled, fd, events);
+  ev_io_start(loop, poll->watcher.io.get());
+
+  return future;
+}
+
+} // namespace internal {
+
+
+Future<short> poll(int fd, short events)
+{
+  process::initialize();
+
+  // TODO(benh): Check if the file descriptor is non-blocking?
+
+  return run_in_event_loop<short>(lambda::bind(&internal::poll, fd, events));
+}
+
+} // namespace io {
+} // namespace process {

http://git-wip-us.apache.org/repos/asf/mesos/blob/413ce94f/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index d96d881..1fc8874 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -927,69 +927,6 @@ void accept(struct ev_loop* loop, ev_io* watcher, int revents)
 }
 
 
-// Data necessary for polling so we can discard polling and actually
-// stop it in the event loop.
-struct Poll
-{
-  Poll()
-  {
-    // Need to explicitly instantiate the watchers.
-    watcher.io.reset(new ev_io());
-    watcher.async.reset(new ev_async());
-  }
-
-  // An I/O watcher for checking for readability or writeability and
-  // an async watcher for being able to discard the polling.
-  struct {
-    memory::shared_ptr<ev_io> io;
-    memory::shared_ptr<ev_async> async;
-  } watcher;
-
-  Promise<short> promise;
-};
-
-
-// Event loop callback when I/O is ready on polling file descriptor.
-void polled(struct ev_loop* loop, ev_io* watcher, int revents)
-{
-  Poll* poll = (Poll*) watcher->data;
-
-  ev_io_stop(loop, poll->watcher.io.get());
-
-  // Stop the async watcher (also clears if pending so 'discard_poll'
-  // will not get invoked and we can delete 'poll' here).
-  ev_async_stop(loop, poll->watcher.async.get());
-
-  poll->promise.set(revents);
-
-  delete poll;
-}
-
-
-// Event loop callback when future associated with polling file
-// descriptor has been discarded.
-void discard_poll(struct ev_loop* loop, ev_async* watcher, int revents)
-{
-  Poll* poll = (Poll*) watcher->data;
-
-  // Check and see if we have a pending 'polled' callback and if so
-  // let it "win".
-  if (ev_is_pending(poll->watcher.io.get())) {
-    return;
-  }
-
-  ev_async_stop(loop, poll->watcher.async.get());
-
-  // Stop the I/O watcher (but note we check if pending above) so it
-  // won't get invoked and we can delete 'poll' here.
-  ev_io_stop(loop, poll->watcher.io.get());
-
-  poll->promise.discard();
-
-  delete poll;
-}
-
-
 void* serve(void* arg)
 {
   ev_loop(((struct ev_loop*) arg), 0);
@@ -3179,47 +3116,8 @@ void post(const UPID& from,
 
 
 namespace io {
-
 namespace internal {
 
-// Helper/continuation of 'poll' on future discard.
-void _poll(const memory::shared_ptr<ev_async>& async)
-{
-  ev_async_send(loop, async.get());
-}
-
-
-Future<short> poll(int fd, short events)
-{
-  Poll* poll = new Poll();
-
-  // Have the watchers data point back to the struct.
-  poll->watcher.async->data = poll;
-  poll->watcher.io->data = poll;
-
-  // Get a copy of the future to avoid any races with the event loop.
-  Future<short> future = poll->promise.future();
-
-  // Initialize and start the async watcher.
-  ev_async_init(poll->watcher.async.get(), discard_poll);
-  ev_async_start(loop, poll->watcher.async.get());
-
-  // Make sure we stop polling if a discard occurs on our future.
-  // Note that it's possible that we'll invoke '_poll' when someone
-  // does a discard even after the polling has already completed, but
-  // in this case while we will interrupt the event loop since the
-  // async watcher has already been stopped we won't cause
-  // 'discard_poll' to get invoked.
-  future.onDiscard(lambda::bind(&_poll, poll->watcher.async));
-
-  // Initialize and start the I/O watcher.
-  ev_io_init(poll->watcher.io.get(), polled, fd, events);
-  ev_io_start(loop, poll->watcher.io.get());
-
-  return future;
-}
-
-
 void read(
     int fd,
     void* data,
@@ -3366,16 +3264,6 @@ void write(
 } // namespace internal {
 
 
-Future<short> poll(int fd, short events)
-{
-  process::initialize();
-
-  // TODO(benh): Check if the file descriptor is non-blocking?
-
-  return run_in_event_loop<short>(lambda::bind(&internal::poll, fd, events));
-}
-
-
 Future<size_t> read(int fd, void* data, size_t size)
 {
   process::initialize();


[15/30] mesos git commit: Used io::poll instead of libev for send*.

Posted by be...@apache.org.
Used io::poll instead of libev for send*.

Updated sending_connect, send_data, and send_file.

Review: https://reviews.apache.org/r/27510


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b774ecb2
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b774ecb2
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b774ecb2

Branch: refs/heads/master
Commit: b774ecb245288c2a0a33b94e9c92ed0ee806b9c1
Parents: bc23da1
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sun Nov 2 21:21:51 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 16:26:49 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/src/encoder.hpp |  9 ++--
 3rdparty/libprocess/src/process.cpp | 90 ++++++++++----------------------
 2 files changed, 35 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/b774ecb2/3rdparty/libprocess/src/encoder.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/encoder.hpp b/3rdparty/libprocess/src/encoder.hpp
index 9c5aa81..800f324 100644
--- a/3rdparty/libprocess/src/encoder.hpp
+++ b/3rdparty/libprocess/src/encoder.hpp
@@ -25,10 +25,13 @@ namespace process {
 
 const uint32_t GZIP_MINIMUM_BODY_LENGTH = 1024;
 
-typedef void (*Sender)(struct ev_loop*, ev_io*, int);
+// Forward declarations.
+class Encoder;
 
-extern void send_data(struct ev_loop*, ev_io*, int);
-extern void send_file(struct ev_loop*, ev_io*, int);
+extern void send_data(Encoder*);
+extern void send_file(Encoder*);
+
+typedef void (*Sender)(Encoder*);
 
 
 class Encoder

http://git-wip-us.apache.org/repos/asf/mesos/blob/b774ecb2/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index a33a201..ac12876 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -681,11 +681,11 @@ void ignore_data(Socket* socket, int s)
 }
 
 
-void send_data(struct ev_loop* loop, ev_io* watcher, int revents)
+void send_data(Encoder* e)
 {
-  DataEncoder* encoder = (DataEncoder*) watcher->data;
+  DataEncoder* encoder = CHECK_NOTNULL(dynamic_cast<DataEncoder*>(e));
 
-  int s = watcher->fd;
+  int s = encoder->socket();
 
   while (true) {
     const void* data;
@@ -703,6 +703,8 @@ void send_data(struct ev_loop* loop, ev_io* watcher, int revents)
     } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
       // Might block, try again later.
       encoder->backup(size);
+      io::poll(s, io::WRITE)
+        .onAny(lambda::bind(&send_data, e));
       break;
     } else if (length <= 0) {
       // Socket error or closed.
@@ -714,8 +716,6 @@ void send_data(struct ev_loop* loop, ev_io* watcher, int revents)
       }
       socket_manager->close(s);
       delete encoder;
-      ev_io_stop(loop, watcher);
-      delete watcher;
       break;
     } else {
       CHECK(length > 0);
@@ -727,18 +727,11 @@ void send_data(struct ev_loop* loop, ev_io* watcher, int revents)
       if (encoder->remaining() == 0) {
         delete encoder;
 
-        // Stop this watcher for now.
-        ev_io_stop(loop, watcher);
-
         // Check for more stuff to send on socket.
         Encoder* next = socket_manager->next(s);
         if (next != NULL) {
-          watcher->data = next;
-          ev_io_init(watcher, next->sender(), s, EV_WRITE);
-          ev_io_start(loop, watcher);
-        } else {
-          // Nothing more to send right now, clean up.
-          delete watcher;
+          io::poll(s, io::WRITE)
+            .onAny(lambda::bind(next->sender(), next));
         }
         break;
       }
@@ -747,11 +740,11 @@ void send_data(struct ev_loop* loop, ev_io* watcher, int revents)
 }
 
 
-void send_file(struct ev_loop* loop, ev_io* watcher, int revents)
+void send_file(Encoder* e)
 {
-  FileEncoder* encoder = (FileEncoder*) watcher->data;
+  FileEncoder* encoder = CHECK_NOTNULL(dynamic_cast<FileEncoder*>(e));
 
-  int s = watcher->fd;
+  int s = encoder->socket();
 
   while (true) {
     int fd;
@@ -770,6 +763,8 @@ void send_file(struct ev_loop* loop, ev_io* watcher, int revents)
     } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
       // Might block, try again later.
       encoder->backup(size);
+      io::poll(s, io::WRITE)
+        .onAny(lambda::bind(&send_file, e));
       break;
     } else if (length <= 0) {
       // Socket error or closed.
@@ -781,8 +776,6 @@ void send_file(struct ev_loop* loop, ev_io* watcher, int revents)
       }
       socket_manager->close(s);
       delete encoder;
-      ev_io_stop(loop, watcher);
-      delete watcher;
       break;
     } else {
       CHECK(length > 0);
@@ -794,18 +787,11 @@ void send_file(struct ev_loop* loop, ev_io* watcher, int revents)
       if (encoder->remaining() == 0) {
         delete encoder;
 
-        // Stop this watcher for now.
-        ev_io_stop(loop, watcher);
-
         // Check for more stuff to send on socket.
         Encoder* next = socket_manager->next(s);
         if (next != NULL) {
-          watcher->data = next;
-          ev_io_init(watcher, next->sender(), s, EV_WRITE);
-          ev_io_start(loop, watcher);
-        } else {
-          // Nothing more to send right now, clean up.
-          delete watcher;
+          io::poll(s, io::WRITE)
+            .onAny(lambda::bind(next->sender(), next));
         }
         break;
       }
@@ -814,9 +800,9 @@ void send_file(struct ev_loop* loop, ev_io* watcher, int revents)
 }
 
 
-void sending_connect(struct ev_loop* loop, ev_io* watcher, int revents)
+void sending_connect(Encoder* encoder)
 {
-  int s = watcher->fd;
+  int s = encoder->socket();
 
   // Now check that a successful connection was made.
   int opt;
@@ -826,15 +812,11 @@ void sending_connect(struct ev_loop* loop, ev_io* watcher, int revents)
     // Connect failure.
     VLOG(1) << "Socket error while connecting";
     socket_manager->close(s);
-    MessageEncoder* encoder = (MessageEncoder*) watcher->data;
     delete encoder;
-    ev_io_stop(loop, watcher);
-    delete watcher;
   } else {
     // We're connected! Now let's do some sending.
-    ev_io_stop(loop, watcher);
-    ev_io_init(watcher, send_data, s, EV_WRITE);
-    ev_io_start(loop, watcher);
+    io::poll(s, io::WRITE)
+      .onAny(lambda::bind(&send_data, encoder));
   }
 }
 
@@ -1640,17 +1622,9 @@ void SocketManager::send(Encoder* encoder, bool persist)
         // Initialize the outgoing queue.
         outgoing[encoder->socket()];
 
-        // Allocate and initialize the watcher.
-        ev_io* watcher = new ev_io();
-        watcher->data = encoder;
-
-        ev_io_init(watcher, encoder->sender(), encoder->socket(), EV_WRITE);
-
-        synchronized (watchers) {
-          watchers->push(watcher);
-        }
-
-        ev_async_send(loop, &async_watcher);
+        // Start polling in order to send with this encoder.
+        io::poll(encoder->socket(), io::WRITE)
+          .onAny(lambda::bind(encoder->sender(), encoder));
       }
     } else {
       VLOG(1) << "Attempting to send on a no longer valid socket!";
@@ -1728,9 +1702,8 @@ void SocketManager::send(Message* message)
       io::poll(s, io::READ)
         .onAny(lambda::bind(&ignore_data, new Socket(sockets[s]), s));
 
-      // Allocate and initialize a watcher for sending the message.
-      ev_io* watcher = new ev_io();
-      watcher->data = new MessageEncoder(sockets[s], message);
+      // Create a message encoder to handle sending this message.
+      Encoder* encoder = new MessageEncoder(sockets[s], message);
 
       // Try and connect to the node using this socket.
       sockaddr_in addr;
@@ -1744,19 +1717,14 @@ void SocketManager::send(Message* message)
           PLOG(FATAL) << "Failed to send, connect";
         }
 
-        // Initialize watcher for connecting.
-        ev_io_init(watcher, sending_connect, s, EV_WRITE);
+        // Start polling in order to wait for being connected.
+        io::poll(s, io::WRITE)
+          .onAny(lambda::bind(&sending_connect, encoder));
       } else {
-        // Initialize watcher for sending.
-        ev_io_init(watcher, send_data, s, EV_WRITE);
-      }
-
-      // Enqueue the watcher.
-      synchronized (watchers) {
-        watchers->push(watcher);
+        // Start polling in order to send data.
+        io::poll(s, io::WRITE)
+          .onAny(lambda::bind(&send_data, encoder));
       }
-
-      ev_async_send(loop, &async_watcher);
     }
   }
 }


[27/30] mesos git commit: Introduce Socket::send() and Socket::sendfile().

Posted by be...@apache.org.
Introduce Socket::send() and Socket::sendfile().

Also used these when linking sockets.

Review: https://reviews.apache.org/r/27964


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/71de11e9
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/71de11e9
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/71de11e9

Branch: refs/heads/master
Commit: 71de11e95f1945031e3ea1308445eddc0850c2be
Parents: 3564c44
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Nov 15 16:49:44 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 17:38:22 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/socket.hpp |  14 ++
 3rdparty/libprocess/src/process.cpp            | 180 +++++++++++++++++++-
 2 files changed, 186 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/71de11e9/3rdparty/libprocess/include/process/socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/socket.hpp b/3rdparty/libprocess/include/process/socket.hpp
index a7c22a0..5fd8d1b 100644
--- a/3rdparty/libprocess/include/process/socket.hpp
+++ b/3rdparty/libprocess/include/process/socket.hpp
@@ -78,6 +78,10 @@ public:
 
     Future<size_t> read(char* data, size_t size);
 
+    Future<size_t> send(const char* data, size_t size);
+
+    Future<size_t> sendfile(int fd, off_t offset, size_t size);
+
   private:
     const Impl& create() const
     {
@@ -129,6 +133,16 @@ public:
     return impl->read(data, size);
   }
 
+  Future<size_t> send(const char* data, size_t size) const
+  {
+    return impl->send(data, size);
+  }
+
+  Future<size_t> sendfile(int fd, off_t offset, size_t size) const
+  {
+    return impl->sendfile(fd, offset, size);
+  }
+
 private:
   explicit Socket(std::shared_ptr<Impl>&& that) : impl(std::move(that)) {}
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/71de11e9/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 63f9df4..b062b85 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -1465,6 +1465,96 @@ Future<size_t> Socket::Impl::read(char* data, size_t size)
 }
 
 
+namespace internal {
+
+Future<size_t> socket_send_data(int s, const char* data, size_t size)
+{
+  CHECK(size > 0);
+
+  while (true) {
+    ssize_t length = send(s, data, size, MSG_NOSIGNAL);
+
+    if (length < 0 && (errno == EINTR)) {
+      // Interrupted, try again now.
+      continue;
+    } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
+      // Might block, try again later.
+      return io::poll(s, io::WRITE)
+        .then(lambda::bind(&internal::socket_send_data, s, data, size));
+    } else if (length <= 0) {
+      // Socket error or closed.
+      if (length < 0) {
+        const char* error = strerror(errno);
+        VLOG(1) << "Socket error while sending: " << error;
+      } else {
+        VLOG(1) << "Socket closed while sending";
+      }
+      if (length == 0) {
+        return length;
+      } else {
+        return Failure(ErrnoError("Socket send failed"));
+      }
+    } else {
+      CHECK(length > 0);
+
+      return length;
+    }
+  }
+}
+
+
+Future<size_t> socket_send_file(int s, int fd, off_t offset, size_t size)
+{
+  CHECK(size > 0);
+
+  while (true) {
+    ssize_t length = os::sendfile(s, fd, offset, size);
+
+    if (length < 0 && (errno == EINTR)) {
+      // Interrupted, try again now.
+      continue;
+    } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
+      // Might block, try again later.
+      return io::poll(s, io::WRITE)
+        .then(lambda::bind(&internal::socket_send_file, s, fd, offset, size));
+    } else if (length <= 0) {
+      // Socket error or closed.
+      if (length < 0) {
+        const char* error = strerror(errno);
+        VLOG(1) << "Socket error while sending: " << error;
+      } else {
+        VLOG(1) << "Socket closed while sending";
+      }
+      if (length == 0) {
+        return length;
+      } else {
+        return Failure(ErrnoError("Socket sendfile failed"));
+      }
+    } else {
+      CHECK(length > 0);
+
+      return length;
+    }
+  }
+}
+
+} // namespace internal {
+
+
+Future<size_t> Socket::Impl::send(const char* data, size_t size)
+{
+  return io::poll(get(), io::WRITE)
+    .then(lambda::bind(&internal::socket_send_data, get(), data, size));
+}
+
+
+Future<size_t> Socket::Impl::sendfile(int fd, off_t offset, size_t size)
+{
+  return io::poll(get(), io::WRITE)
+    .then(lambda::bind(&internal::socket_send_file, get(), fd, offset, size));
+}
+
+
 SocketManager::SocketManager()
 {
   synchronizer(this) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
@@ -1600,27 +1690,101 @@ PID<HttpProxy> SocketManager::proxy(const Socket& socket)
 }
 
 
+namespace internal {
+
+void _send(
+    const Future<size_t>& result,
+    Socket* socket,
+    Encoder* encoder,
+    size_t size);
+
+
+void send(Encoder* encoder, Socket* socket)
+{
+  switch (encoder->kind()) {
+    case Encoder::DATA: {
+      size_t size;
+      const char* data = reinterpret_cast<DataEncoder*>(encoder)->next(&size);
+      socket->send(data, size)
+        .onAny(lambda::bind(
+            &internal::_send,
+            lambda::_1,
+            socket,
+            encoder,
+            size));
+      break;
+    }
+    case Encoder::FILE: {
+      off_t offset;
+      size_t size;
+      int fd = reinterpret_cast<FileEncoder*>(encoder)->next(&offset, &size);
+      socket->sendfile(fd, offset, size)
+        .onAny(lambda::bind(
+            &internal::_send,
+            lambda::_1,
+            socket,
+            encoder,
+            size));
+      break;
+    }
+  }
+}
+
+
+void _send(
+    const Future<size_t>& length,
+    Socket* socket,
+    Encoder* encoder,
+    size_t size)
+{
+  if (length.isDiscarded() || length.isFailed()) {
+    socket_manager->close(*socket);
+    delete socket;
+    delete encoder;
+  } else {
+    // Update the encoder with the amount sent.
+    encoder->backup(size - length.get());
+
+    // See if there is any more of the message to send.
+    if (encoder->remaining() == 0) {
+      delete encoder;
+
+      // Check for more stuff to send on socket.
+      Encoder* next = socket_manager->next(*socket);
+      if (next != NULL) {
+        send(next, socket);
+      } else {
+        delete socket;
+      }
+    } else {
+      send(encoder, socket);
+    }
+  }
+}
+
+} // namespace internal {
+
+
 void SocketManager::send(Encoder* encoder, bool persist)
 {
   CHECK(encoder != NULL);
 
   synchronized (this) {
-    if (sockets.count(encoder->socket()) > 0) {
+    Socket socket = encoder->socket();
+    if (sockets.count(socket) > 0) {
       // Update whether or not this socket should get disposed after
       // there is no more data to send.
       if (!persist) {
-        dispose.insert(encoder->socket());
+        dispose.insert(socket);
       }
 
-      if (outgoing.count(encoder->socket()) > 0) {
-        outgoing[encoder->socket()].push(encoder);
+      if (outgoing.count(socket) > 0) {
+        outgoing[socket].push(encoder);
       } else {
         // Initialize the outgoing queue.
-        outgoing[encoder->socket()];
+        outgoing[socket];
 
-        // Start polling in order to send with this encoder.
-        io::poll(encoder->socket(), io::WRITE)
-          .onAny(lambda::bind(encoder->sender(), encoder));
+        internal::send(encoder, new Socket(socket));
       }
     } else {
       VLOG(1) << "Attempting to send on a no longer valid socket!";


[06/30] mesos git commit: Simplified redundant Clock::order/update usage.

Posted by be...@apache.org.
Simplified redundant Clock::order/update usage.

Review: https://reviews.apache.org/r/27501


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e1ef91fb
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e1ef91fb
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e1ef91fb

Branch: refs/heads/master
Commit: e1ef91fb0c711d6bf69e72e468a2c2c55684e07f
Parents: 6df6d02
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sun Nov 2 16:18:57 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 16:25:58 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/clock.hpp | 16 ++++++-
 3rdparty/libprocess/src/process.cpp           | 51 +++++-----------------
 2 files changed, 25 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/e1ef91fb/3rdparty/libprocess/include/process/clock.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/clock.hpp b/3rdparty/libprocess/include/process/clock.hpp
index ae7d0fb..db0fb04 100644
--- a/3rdparty/libprocess/include/process/clock.hpp
+++ b/3rdparty/libprocess/include/process/clock.hpp
@@ -45,7 +45,21 @@ public:
   static void advance(ProcessBase* process, const Duration& duration);
 
   static void update(const Time& time);
-  static void update(ProcessBase* process, const Time& time);
+
+  // When updating the time of a particular process you can specify
+  // whether or not you want to override the existing value even if
+  // you're going backwards in time! SAFE means don't update the
+  // previous Clock for a process if going backwards in time, where as
+  // FORCE forces this change.
+  enum Update {
+    SAFE,
+    FORCE,
+  };
+
+  static void update(
+      ProcessBase* process,
+      const Time& time,
+      Update update = SAFE);
 
   static void order(ProcessBase* from, ProcessBase* to);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/e1ef91fb/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 0995a9c..2282e9b 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -663,11 +663,11 @@ void Clock::update(const Time& time)
 }
 
 
-void Clock::update(ProcessBase* process, const Time& time)
+void Clock::update(ProcessBase* process, const Time& time, Update update)
 {
   synchronized (timeouts) {
     if (clock::paused) {
-      if (now(process) < time) {
+      if (now(process) < time || update == Clock::FORCE) {
         VLOG(2) << "Clock of " << process->self() << " updated to " << time;
         (*clock::currents)[process] = Time(time);
       }
@@ -2644,15 +2644,7 @@ bool ProcessManager::deliver(
   // the duration of this routine (so that we can look up it's current
   // time).
   if (Clock::paused()) {
-    synchronized (timeouts) {
-      if (Clock::paused()) {
-        if (sender != NULL) {
-          Clock::order(sender, receiver);
-        } else {
-          Clock::update(receiver, Clock::now());
-        }
-      }
-    }
+    Clock::update(receiver, Clock::now(sender != NULL ? sender : __process__));
   }
 
   receiver->enqueue(event);
@@ -2948,15 +2940,7 @@ void ProcessManager::terminate(
 {
   if (ProcessReference process = use(pid)) {
     if (Clock::paused()) {
-      synchronized (timeouts) {
-        if (Clock::paused()) {
-          if (sender != NULL) {
-            Clock::order(sender, process);
-          } else {
-            Clock::update(process, Clock::now());
-          }
-        }
-      }
+      Clock::update(process, Clock::now(sender != NULL ? sender : __process__));
     }
 
     if (sender != NULL) {
@@ -3293,18 +3277,10 @@ ProcessBase::ProcessBase(const string& id)
   pid.node = __node__;
 
   // If using a manual clock, try and set current time of process
-  // using happens before relationship between creator and createe!
+  // using happens before relationship between creator (__process__)
+  // and createe (this)!
   if (Clock::paused()) {
-    synchronized (timeouts) {
-      if (Clock::paused()) {
-        clock::currents->erase(this); // In case the address is reused!
-        if (__process__ != NULL) {
-          Clock::order(__process__, this);
-        } else {
-          Clock::update(this, Clock::now());
-        }
-      }
-    }
+    Clock::update(this, Clock::now(__process__), Clock::FORCE);
   }
 }
 
@@ -3512,17 +3488,10 @@ UPID spawn(ProcessBase* process, bool manage)
 
   if (process != NULL) {
     // If using a manual clock, try and set current time of process
-    // using happens before relationship between spawner and spawnee!
+    // using happens before relationship between spawner (__process__)
+    // and spawnee (process)!
     if (Clock::paused()) {
-      synchronized (timeouts) {
-        if (Clock::paused()) {
-          if (__process__ != NULL) {
-            Clock::order(__process__, process);
-          } else {
-            Clock::update(process, Clock::now());
-          }
-        }
-      }
+      Clock::update(process, Clock::now(__process__));
     }
 
     return process_manager->spawn(process, manage);


[09/30] mesos git commit: Used io::poll instead of libev for ignore_data.

Posted by be...@apache.org.
Used io::poll instead of libev for ignore_data.

Review: https://reviews.apache.org/r/27508


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9f94d9a8
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9f94d9a8
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9f94d9a8

Branch: refs/heads/master
Commit: 9f94d9a8a42352345bffb0f1be0253952731b316
Parents: 4751167
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sun Nov 2 20:43:29 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 16:25:58 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/src/process.cpp | 54 ++++++++++++++------------------
 1 file changed, 23 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/9f94d9a8/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 528cb88..bc2884b 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -644,12 +644,8 @@ void recv_data(DataDecoder* decoder, int s)
 // SocketManager::send(Message) where we don't care about the data
 // received we mostly just want to know when the socket has been
 // closed.
-void ignore_data(struct ev_loop* loop, ev_io* watcher, int revents)
+void ignore_data(Socket* socket, int s)
 {
-  Socket* socket = (Socket*) watcher->data;
-
-  int s = watcher->fd;
-
   while (true) {
     const ssize_t size = 80 * 1024;
     ssize_t length = 0;
@@ -663,6 +659,8 @@ void ignore_data(struct ev_loop* loop, ev_io* watcher, int revents)
       continue;
     } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
       // Might block, try again later.
+      io::poll(s, io::READ)
+        .onAny(lambda::bind(&ignore_data, socket, s));
       break;
     } else if (length <= 0) {
       // Socket error or closed.
@@ -673,9 +671,7 @@ void ignore_data(struct ev_loop* loop, ev_io* watcher, int revents)
         VLOG(2) << "Socket closed while receiving";
       }
       socket_manager->close(s);
-      ev_io_stop(loop, watcher);
       delete socket;
-      delete watcher;
       break;
     } else {
       VLOG(2) << "Ignoring " << length << " bytes of data received "
@@ -861,9 +857,11 @@ void receiving_connect(struct ev_loop* loop, ev_io* watcher, int revents)
     delete watcher;
   } else {
     // We're connected! Now let's do some receiving.
+    Socket* socket = (Socket*) watcher->data;
     ev_io_stop(loop, watcher);
-    ev_io_init(watcher, ignore_data, s, EV_READ);
-    ev_io_start(loop, watcher);
+    delete watcher;
+    io::poll(s, io::READ)
+      .onAny(lambda::bind(&ignore_data, socket, s));
   }
 }
 
@@ -1591,17 +1589,18 @@ void SocketManager::link(ProcessBase* process, const UPID& to)
 
         // Wait for socket to be connected.
         ev_io_init(watcher, receiving_connect, s, EV_WRITE);
-      } else {
-        ev_io_init(watcher, ignore_data, s, EV_READ);
-      }
 
-      // Enqueue the watcher.
-      synchronized (watchers) {
-        watchers->push(watcher);
-      }
+        // Enqueue the watcher.
+        synchronized (watchers) {
+          watchers->push(watcher);
+        }
 
-      // Interrupt the loop.
-      ev_async_send(loop, &async_watcher);
+        // Interrupt the loop.
+        ev_async_send(loop, &async_watcher);
+      } else {
+        io::poll(s, io::READ)
+          .onAny(lambda::bind(&ignore_data, new Socket(sockets[s]), s));
+      }
     }
 
     links[to].insert(process);
@@ -1741,21 +1740,14 @@ void SocketManager::send(Message* message)
       // Initialize the outgoing queue.
       outgoing[s];
 
-      // Allocate and initialize a watcher for reading data from this
-      // socket. Note that we don't expect to receive anything other
-      // than HTTP '202 Accepted' responses which we anyway ignore.
-      ev_io* watcher = new ev_io();
-      watcher->data = new Socket(sockets[s]);
-
-      ev_io_init(watcher, ignore_data, s, EV_READ);
-
-      // Enqueue the watcher.
-      synchronized (watchers) {
-        watchers->push(watcher);
-      }
+      // Read and ignore data from this socket. Note that we don't
+      // expect to receive anything other than HTTP '202 Accepted'
+      // responses which we just ignore.
+      io::poll(s, io::READ)
+        .onAny(lambda::bind(&ignore_data, new Socket(sockets[s]), s));
 
       // Allocate and initialize a watcher for sending the message.
-      watcher = new ev_io();
+      ev_io* watcher = new ev_io();
       watcher->data = new MessageEncoder(sockets[s], message);
 
       // Try and connect to the node using this socket.


[14/30] mesos git commit: Used io::poll instead of libev for recv_data.

Posted by be...@apache.org.
Used io::poll instead of libev for recv_data.

Review: https://reviews.apache.org/r/27507


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/47511670
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/47511670
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/47511670

Branch: refs/heads/master
Commit: 47511670aaad85ab068902151c9c3f84573fbc99
Parents: 37bba65
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sun Nov 2 20:30:48 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 16:25:58 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/src/process.cpp | 23 ++++++-----------------
 1 file changed, 6 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/47511670/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index aeaac0c..528cb88 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -589,12 +589,8 @@ void handle_async(struct ev_loop* loop, ev_async* _, int revents)
 }
 
 
-void recv_data(struct ev_loop* loop, ev_io* watcher, int revents)
+void recv_data(DataDecoder* decoder, int s)
 {
-  DataDecoder* decoder = (DataDecoder*) watcher->data;
-
-  int s = watcher->fd;
-
   while (true) {
     const ssize_t size = 80 * 1024;
     ssize_t length = 0;
@@ -608,6 +604,8 @@ void recv_data(struct ev_loop* loop, ev_io* watcher, int revents)
       continue;
     } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
       // Might block, try again later.
+      io::poll(s, io::READ)
+        .onAny(lambda::bind(&recv_data, decoder, s));
       break;
     } else if (length <= 0) {
       // Socket error or closed.
@@ -619,8 +617,6 @@ void recv_data(struct ev_loop* loop, ev_io* watcher, int revents)
       }
       socket_manager->close(s);
       delete decoder;
-      ev_io_stop(loop, watcher);
-      delete watcher;
       break;
     } else {
       CHECK(length > 0);
@@ -636,8 +632,6 @@ void recv_data(struct ev_loop* loop, ev_io* watcher, int revents)
         VLOG(1) << "Decoder error while receiving";
         socket_manager->close(s);
         delete decoder;
-        ev_io_stop(loop, watcher);
-        delete watcher;
         break;
       }
     }
@@ -913,14 +907,9 @@ void accept(struct ev_loop* loop, ev_io* watcher, int revents)
     // Inform the socket manager for proper bookkeeping.
     const Socket& socket = socket_manager->accepted(s);
 
-    // Allocate and initialize the decoder and watcher.
-    DataDecoder* decoder = new DataDecoder(socket);
-
-    ev_io* watcher = new ev_io();
-    watcher->data = decoder;
-
-    ev_io_init(watcher, recv_data, s, EV_READ);
-    ev_io_start(loop, watcher);
+    // Start reading from the socket.
+    io::poll(s, io::READ)
+      .onAny(lambda::bind(&recv_data, new DataDecoder(socket), s));
   }
 }
 


[17/30] mesos git commit: Fix race condition in process.cpp.

Posted by be...@apache.org.
Fix race condition in process.cpp.

Fix race condition between encoder creation and io::poll in
SocketManager::Send(Message).

Review: https://reviews.apache.org/r/27956


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/fcdae270
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/fcdae270
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/fcdae270

Branch: refs/heads/master
Commit: fcdae2704dbe6cd04c1f705497ae1113cb29e039
Parents: d1d4340
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Nov 15 16:29:26 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 16:29:27 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/src/process.cpp | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/fcdae270/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index ac12876..48e5486 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -1696,15 +1696,15 @@ void SocketManager::send(Message* message)
       // Initialize the outgoing queue.
       outgoing[s];
 
+      // Create a message encoder to handle sending this message.
+      Encoder* encoder = new MessageEncoder(sockets[s], message);
+
       // Read and ignore data from this socket. Note that we don't
       // expect to receive anything other than HTTP '202 Accepted'
       // responses which we just ignore.
       io::poll(s, io::READ)
         .onAny(lambda::bind(&ignore_data, new Socket(sockets[s]), s));
 
-      // Create a message encoder to handle sending this message.
-      Encoder* encoder = new MessageEncoder(sockets[s], message);
-
       // Try and connect to the node using this socket.
       sockaddr_in addr;
       memset(&addr, 0, sizeof(addr));


[23/30] mesos git commit: Introduce std::enable_shared_from_this configure check in libprocess.

Posted by be...@apache.org.
Introduce std::enable_shared_from_this configure check in libprocess.

Review: https://reviews.apache.org/r/27351


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/702b3824
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/702b3824
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/702b3824

Branch: refs/heads/master
Commit: 702b38240b9a596a4b46983da96ae9ca1c052b5d
Parents: 075859d
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Nov 15 17:36:33 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 17:38:21 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/m4/ax_cxx_compile_stdcxx_11.m4 | 13 +++++++++++++
 1 file changed, 13 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/702b3824/3rdparty/libprocess/m4/ax_cxx_compile_stdcxx_11.m4
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/m4/ax_cxx_compile_stdcxx_11.m4 b/3rdparty/libprocess/m4/ax_cxx_compile_stdcxx_11.m4
index d48a96e..251213a 100644
--- a/3rdparty/libprocess/m4/ax_cxx_compile_stdcxx_11.m4
+++ b/3rdparty/libprocess/m4/ax_cxx_compile_stdcxx_11.m4
@@ -81,6 +81,19 @@ m4_define([_AX_CXX_COMPILE_STDCXX_11_testbody], [
       // End scope of uniqueLock.
     }
   }
+
+  // Check for std::enable_shared_from_this.
+  struct SharedStruct : public std::enable_shared_from_this<SharedStruct>
+  {
+    std::shared_ptr<SharedStruct> get()
+    {
+      return shared_from_this();
+    }
+  };
+
+  // Construct a new shared_ptr using shared_from_this().
+  std::shared_ptr<SharedStruct> object =
+    std::shared_ptr<SharedStruct>(new SharedStruct())->get();
 ])
 
 AC_DEFUN([AX_CXX_COMPILE_STDCXX_11], [


[24/30] mesos git commit: Remove dead code in process.cpp.

Posted by be...@apache.org.
Remove dead code in process.cpp.

Review: https://reviews.apache.org/r/27959


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/149164bb
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/149164bb
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/149164bb

Branch: refs/heads/master
Commit: 149164bb477ac5e8065e12f00124f6154a1180fe
Parents: 4d616f8
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Nov 15 16:46:15 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 17:38:21 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/src/process.cpp | 40 --------------------------------
 1 file changed, 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/149164bb/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 6916cbb..73d41c9 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -800,46 +800,6 @@ void send_file(Encoder* e)
 }
 
 
-void sending_connect(Encoder* encoder)
-{
-  int s = encoder->socket();
-
-  // Now check that a successful connection was made.
-  int opt;
-  socklen_t optlen = sizeof(opt);
-
-  if (getsockopt(s, SOL_SOCKET, SO_ERROR, &opt, &optlen) < 0 || opt != 0) {
-    // Connect failure.
-    VLOG(1) << "Socket error while connecting";
-    socket_manager->close(s);
-    delete encoder;
-  } else {
-    // We're connected! Now let's do some sending.
-    io::poll(s, io::WRITE)
-      .onAny(lambda::bind(&send_data, encoder));
-  }
-}
-
-
-void receiving_connect(Socket* socket, int s)
-{
-  // Now check that a successful connection was made.
-  int opt;
-  socklen_t optlen = sizeof(opt);
-
-  if (getsockopt(s, SOL_SOCKET, SO_ERROR, &opt, &optlen) < 0 || opt != 0) {
-    // Connect failure.
-    VLOG(1) << "Socket error while connecting";
-    socket_manager->close(s);
-    delete socket;
-  } else {
-    // We're connected! Now let's do some receiving.
-    io::poll(s, io::READ)
-      .onAny(lambda::bind(&ignore_data, socket, s));
-  }
-}
-
-
 void accept(struct ev_loop* loop, ev_io* watcher, int revents)
 {
   CHECK_EQ(__s__, watcher->fd);


[18/30] mesos git commit: Add read() to Socket interface.

Posted by be...@apache.org.
Add read() to Socket interface.

Also used it when accepting connections in libprocess.

Review: https://reviews.apache.org/r/27960


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/383f5479
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/383f5479
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/383f5479

Branch: refs/heads/master
Commit: 383f5479d71237c033acdda678571a1ad0b993b1
Parents: 149164b
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Nov 15 16:46:34 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 17:38:21 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/socket.hpp |   7 ++
 3rdparty/libprocess/src/process.cpp            | 133 +++++++++++---------
 2 files changed, 84 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/383f5479/3rdparty/libprocess/include/process/socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/socket.hpp b/3rdparty/libprocess/include/process/socket.hpp
index fdad91f..a7c22a0 100644
--- a/3rdparty/libprocess/include/process/socket.hpp
+++ b/3rdparty/libprocess/include/process/socket.hpp
@@ -76,6 +76,8 @@ public:
 
     Future<Socket> connect(const Node& node);
 
+    Future<size_t> read(char* data, size_t size);
+
   private:
     const Impl& create() const
     {
@@ -122,6 +124,11 @@ public:
     return impl->connect(node);
   }
 
+  Future<size_t> read(char* data, size_t size) const
+  {
+    return impl->read(data, size);
+  }
+
 private:
   explicit Socket(std::shared_ptr<Impl>&& that) : impl(std::move(that)) {}
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/383f5479/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 73d41c9..c37d522 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -589,56 +589,6 @@ void handle_async(struct ev_loop* loop, ev_async* _, int revents)
 }
 
 
-void recv_data(DataDecoder* decoder, int s)
-{
-  while (true) {
-    const ssize_t size = 80 * 1024;
-    ssize_t length = 0;
-
-    char data[size];
-
-    length = recv(s, data, size, 0);
-
-    if (length < 0 && (errno == EINTR)) {
-      // Interrupted, try again now.
-      continue;
-    } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
-      // Might block, try again later.
-      io::poll(s, io::READ)
-        .onAny(lambda::bind(&recv_data, decoder, s));
-      break;
-    } else if (length <= 0) {
-      // Socket error or closed.
-      if (length < 0) {
-        const char* error = strerror(errno);
-        VLOG(1) << "Socket error while receiving: " << error;
-      } else {
-        VLOG(2) << "Socket closed while receiving";
-      }
-      socket_manager->close(s);
-      delete decoder;
-      break;
-    } else {
-      CHECK(length > 0);
-
-      // Decode as much of the data as possible into HTTP requests.
-      const deque<Request*>& requests = decoder->decode(data, length);
-
-      if (!requests.empty()) {
-        foreach (Request* request, requests) {
-          process_manager->handle(decoder->socket(), request);
-        }
-      } else if (requests.empty() && decoder->failed()) {
-        VLOG(1) << "Decoder error while receiving";
-        socket_manager->close(s);
-        delete decoder;
-        break;
-      }
-    }
-  }
-}
-
-
 // A variant of 'recv_data' that doesn't do anything with the
 // data. Used by sockets created via SocketManager::link as well as
 // SocketManager::send(Message) where we don't care about the data
@@ -800,6 +750,57 @@ void send_file(Encoder* e)
 }
 
 
+namespace internal {
+
+void decode_read(
+    const Future<size_t>& length,
+    char* data,
+    size_t size,
+    Socket* socket,
+    DataDecoder* decoder)
+{
+  if (length.isDiscarded() || length.isFailed()) {
+    if (length.isFailed()) {
+      VLOG(1) << "Decode failure: " << length.failure();
+    }
+
+    socket_manager->close(*socket);
+    delete[] data;
+    delete decoder;
+    delete socket;
+    return;
+  }
+
+  if (length.get() == 0) {
+    socket_manager->close(*socket);
+    delete[] data;
+    delete decoder;
+    delete socket;
+    return;
+  }
+  // Decode as much of the data as possible into HTTP requests.
+  const deque<Request*>& requests = decoder->decode(data, length.get());
+
+  if (!requests.empty()) {
+    foreach (Request* request, requests) {
+      process_manager->handle(decoder->socket(), request);
+    }
+  } else if (requests.empty() && decoder->failed()) {
+    VLOG(1) << "Decoder error while receiving";
+    socket_manager->close(*socket);
+    delete[] data;
+    delete decoder;
+    delete socket;
+    return;
+  }
+
+  socket->read(data, size)
+    .onAny(lambda::bind(&decode_read, lambda::_1, data, size, socket, decoder));
+}
+
+} // namespace internal {
+
+
 void accept(struct ev_loop* loop, ev_io* watcher, int revents)
 {
   CHECK_EQ(__s__, watcher->fd);
@@ -837,11 +838,25 @@ void accept(struct ev_loop* loop, ev_io* watcher, int revents)
     os::close(s);
   } else {
     // Inform the socket manager for proper bookkeeping.
-    const Socket& socket = socket_manager->accepted(s);
+    Socket socket = socket_manager->accepted(s);
+
+    // Allocate a buffer to read into. This can be replaced later
+    // when socket supports a read function that provides the
+    // buffered data in the resulting callback.
+    const size_t size = 80 * 1024;
+    char* data = new char[size];
+    memset(data, 0, size);
+
+    DataDecoder* decoder = new DataDecoder(socket);
 
-    // Start reading from the socket.
-    io::poll(s, io::READ)
-      .onAny(lambda::bind(&recv_data, new DataDecoder(socket), s));
+    socket.read(data, size)
+      .onAny(lambda::bind(
+          &internal::decode_read,
+          lambda::_1,
+          data,
+          size,
+          new Socket(socket),
+          decoder));
   }
 }
 
@@ -1486,6 +1501,12 @@ Future<Socket> Socket::Impl::connect(const Node& node)
 }
 
 
+Future<size_t> Socket::Impl::read(char* data, size_t size)
+{
+  return io::read(get(), data, size);
+}
+
+
 SocketManager::SocketManager()
 {
   synchronizer(this) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
@@ -1711,8 +1732,8 @@ Encoder* SocketManager::next(int s)
     // We cannot assume 'sockets.count(s) > 0' here because it's
     // possible that 's' has been removed with a a call to
     // SocketManager::close. For example, it could be the case that a
-    // socket has gone to CLOSE_WAIT and the call to 'recv' in
-    // recv_data returned 0 causing SocketManager::close to get
+    // socket has gone to CLOSE_WAIT and the call to read in
+    // io::read returned 0 causing SocketManager::close to get
     // invoked. Later a call to 'send' or 'sendfile' (e.g., in
     // send_data or send_file) can "succeed" (because the socket is
     // not "closed" yet because there are still some Socket


[04/30] mesos git commit: Used io::poll instead of libev for receiving_connect.

Posted by be...@apache.org.
Used io::poll instead of libev for receiving_connect.

Review: https://reviews.apache.org/r/27509


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/bc23da1b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/bc23da1b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/bc23da1b

Branch: refs/heads/master
Commit: bc23da1b04aeda9d35ee2f9952fc82248b1ff313
Parents: 9f94d9a
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sun Nov 2 20:49:51 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 16:25:58 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/src/process.cpp | 36 ++++++++------------------------
 1 file changed, 9 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/bc23da1b/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index bc2884b..a33a201 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -839,10 +839,8 @@ void sending_connect(struct ev_loop* loop, ev_io* watcher, int revents)
 }
 
 
-void receiving_connect(struct ev_loop* loop, ev_io* watcher, int revents)
+void receiving_connect(Socket* socket, int s)
 {
-  int s = watcher->fd;
-
   // Now check that a successful connection was made.
   int opt;
   socklen_t optlen = sizeof(opt);
@@ -851,15 +849,9 @@ void receiving_connect(struct ev_loop* loop, ev_io* watcher, int revents)
     // Connect failure.
     VLOG(1) << "Socket error while connecting";
     socket_manager->close(s);
-    Socket* socket = (Socket*) watcher->data;
     delete socket;
-    ev_io_stop(loop, watcher);
-    delete watcher;
   } else {
     // We're connected! Now let's do some receiving.
-    Socket* socket = (Socket*) watcher->data;
-    ev_io_stop(loop, watcher);
-    delete watcher;
     io::poll(s, io::READ)
       .onAny(lambda::bind(&ignore_data, socket, s));
   }
@@ -1567,15 +1559,12 @@ void SocketManager::link(ProcessBase* process, const UPID& to)
 
       persists[to.node] = s;
 
-      // Allocate and initialize a watcher for reading data from this
-      // socket. Note that we don't expect to receive anything other
-      // than HTTP '202 Accepted' responses which we anyway ignore.
-      // We do, however, want to react when it gets closed so we can
-      // generate appropriate lost events (since this is a 'link').
-      ev_io* watcher = new ev_io();
-      watcher->data = new Socket(sockets[s]);
-
-      // Try and connect to the node using this socket.
+      // Try and connect to the node using this socket in order to
+      // start reading data. Note that we don't expect to receive
+      // anything other than HTTP '202 Accepted' responses which we
+      // anyway ignore.  We do, however, want to react when it gets
+      // closed so we can generate appropriate lost events (since this
+      // is a 'link').
       sockaddr_in addr;
       memset(&addr, 0, sizeof(addr));
       addr.sin_family = PF_INET;
@@ -1588,15 +1577,8 @@ void SocketManager::link(ProcessBase* process, const UPID& to)
         }
 
         // Wait for socket to be connected.
-        ev_io_init(watcher, receiving_connect, s, EV_WRITE);
-
-        // Enqueue the watcher.
-        synchronized (watchers) {
-          watchers->push(watcher);
-        }
-
-        // Interrupt the loop.
-        ev_async_send(loop, &async_watcher);
+        io::poll(s, io::WRITE)
+          .onAny(lambda::bind(&receiving_connect, new Socket(sockets[s]), s));
       } else {
         io::poll(s, io::READ)
           .onAny(lambda::bind(&ignore_data, new Socket(sockets[s]), s));


[22/30] mesos git commit: Use std::shared_ptr to do reference counting in Socket.

Posted by be...@apache.org.
Use std::shared_ptr to do reference counting in Socket.

Review: https://reviews.apache.org/r/27957


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/075859d9
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/075859d9
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/075859d9

Branch: refs/heads/master
Commit: 075859d984aa0b23d8efb18cc9bf0e6a17f3e00f
Parents: 9eda433
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Nov 15 16:45:35 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 17:38:21 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/socket.hpp | 87 ++++++++++-----------
 1 file changed, 42 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/075859d9/3rdparty/libprocess/include/process/socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/socket.hpp b/3rdparty/libprocess/include/process/socket.hpp
index 6683881..9f4302e 100644
--- a/3rdparty/libprocess/include/process/socket.hpp
+++ b/3rdparty/libprocess/include/process/socket.hpp
@@ -4,6 +4,7 @@
 #include <assert.h>
 
 #include <stout/abort.hpp>
+#include <stout/memory.hpp>
 #include <stout/nothing.hpp>
 #include <stout/os.hpp>
 #include <stout/try.hpp>
@@ -40,66 +41,62 @@ inline Try<int> socket(int family, int type, int protocol) {
 class Socket
 {
 public:
-  Socket()
-    : refs(new int(1)), s(-1) {}
+  class Impl
+  {
+  public:
+    Impl() : s(-1) {}
 
-  explicit Socket(int _s)
-    : refs(new int(1)), s(_s) {}
+    explicit Impl(int _s) : s(_s) {}
 
-  ~Socket()
-  {
-    cleanup();
-  }
+    ~Impl()
+    {
+      if (s >= 0) {
+        Try<Nothing> close = os::close(s);
+        if (close.isError()) {
+          ABORT(
+            "Failed to close socket " + stringify(s) + ": " + close.error());
+        }
+      }
+    }
 
-  Socket(const Socket& that)
-  {
-    copy(that);
-  }
+    int get() const
+    {
+      return s >= 0 ? s : create().get();
+    }
 
-  Socket& operator = (const Socket& that)
-  {
-    if (this != &that) {
-      cleanup();
-      copy(that);
+  private:
+    const Impl& create() const
+    {
+      CHECK(s < 0);
+      Try<int> fd =
+        process::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
+      if (fd.isError()) {
+        ABORT("Failed to create socket: " + fd.error());
+      }
+      s = fd.get();
+      return *this;
     }
-    return *this;
-  }
+
+    // Mutable so that the socket can be lazily created.
+    mutable int s;
+  };
+
+  Socket() : impl(std::make_shared<Impl>()) {}
+
+  explicit Socket(int s) : impl(std::make_shared<Impl>(s)) {}
 
   bool operator == (const Socket& that) const
   {
-    return s == that.s && refs == that.refs;
+    return impl == that.impl;
   }
 
   operator int () const
   {
-    return s;
+    return impl->get();
   }
 
 private:
-  void copy(const Socket& that)
-  {
-    assert(that.refs > 0);
-    __sync_fetch_and_add(that.refs, 1);
-    refs = that.refs;
-    s = that.s;
-  }
-
-  void cleanup()
-  {
-    assert(refs != NULL);
-    if (__sync_sub_and_fetch(refs, 1) == 0) {
-      delete refs;
-      if (s >= 0) {
-        Try<Nothing> close = os::close(s);
-        if (close.isError()) {
-          ABORT("Failed to close socket: " + close.error());
-        }
-      }
-    }
-  }
-
-  int* refs;
-  int s;
+  memory::shared_ptr<Impl> impl;
 };
 
 } // namespace process {


[30/30] mesos git commit: Use Socket::send() for temporary connections.

Posted by be...@apache.org.
Use Socket::send() for temporary connections.

And removed send_data and send_file now that it is not being used.

Review: https://reviews.apache.org/r/27965


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8edab655
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8edab655
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8edab655

Branch: refs/heads/master
Commit: 8edab655c990a310ef6ac66e64f116addcaf147c
Parents: 71de11e
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Nov 15 16:50:57 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 17:38:22 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/src/encoder.hpp |  17 -----
 3rdparty/libprocess/src/process.cpp | 123 +------------------------------
 2 files changed, 1 insertion(+), 139 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/8edab655/3rdparty/libprocess/src/encoder.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/encoder.hpp b/3rdparty/libprocess/src/encoder.hpp
index 7afde0e..2b0d83f 100644
--- a/3rdparty/libprocess/src/encoder.hpp
+++ b/3rdparty/libprocess/src/encoder.hpp
@@ -28,11 +28,6 @@ const uint32_t GZIP_MINIMUM_BODY_LENGTH = 1024;
 // Forward declarations.
 class Encoder;
 
-extern void send_data(Encoder*);
-extern void send_file(Encoder*);
-
-typedef void (*Sender)(Encoder*);
-
 
 class Encoder
 {
@@ -45,8 +40,6 @@ public:
   explicit Encoder(const Socket& _s) : s(_s) {}
   virtual ~Encoder() {}
 
-  virtual Sender sender() = 0;
-
   virtual Kind kind() const = 0;
 
   virtual void backup(size_t length) = 0;
@@ -71,11 +64,6 @@ public:
 
   virtual ~DataEncoder() {}
 
-  virtual Sender sender()
-  {
-    return send_data;
-  }
-
   virtual Kind kind() const
   {
     return Encoder::DATA;
@@ -254,11 +242,6 @@ public:
     os::close(fd);
   }
 
-  virtual Sender sender()
-  {
-    return send_file;
-  }
-
   virtual Kind kind() const
   {
     return Encoder::FILE;

http://git-wip-us.apache.org/repos/asf/mesos/blob/8edab655/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index b062b85..9f91020 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -589,125 +589,6 @@ void handle_async(struct ev_loop* loop, ev_async* _, int revents)
 }
 
 
-void send_data(Encoder* e)
-{
-  DataEncoder* encoder = CHECK_NOTNULL(dynamic_cast<DataEncoder*>(e));
-
-  int s = encoder->socket();
-
-  while (true) {
-    const void* data;
-    size_t size;
-
-    data = encoder->next(&size);
-    CHECK(size > 0);
-
-    ssize_t length = send(s, data, size, MSG_NOSIGNAL);
-
-    if (length < 0 && (errno == EINTR)) {
-      // Interrupted, try again now.
-      encoder->backup(size);
-      continue;
-    } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
-      // Might block, try again later.
-      encoder->backup(size);
-      io::poll(s, io::WRITE)
-        .onAny(lambda::bind(&send_data, e));
-      break;
-    } else if (length <= 0) {
-      // Socket error or closed.
-      if (length < 0) {
-        const char* error = strerror(errno);
-        VLOG(1) << "Socket error while sending: " << error;
-      } else {
-        VLOG(1) << "Socket closed while sending";
-      }
-      socket_manager->close(s);
-      delete encoder;
-      break;
-    } else {
-      CHECK(length > 0);
-
-      // Update the encoder with the amount sent.
-      encoder->backup(size - length);
-
-      // See if there is any more of the message to send.
-      if (encoder->remaining() == 0) {
-        delete encoder;
-
-        // Check for more stuff to send on socket.
-        Encoder* next = socket_manager->next(s);
-        if (next != NULL) {
-          io::poll(s, io::WRITE)
-            .onAny(lambda::bind(next->sender(), next));
-        }
-        break;
-      }
-    }
-  }
-}
-
-
-void send_file(Encoder* e)
-{
-  FileEncoder* encoder = CHECK_NOTNULL(dynamic_cast<FileEncoder*>(e));
-
-  int s = encoder->socket();
-
-  while (true) {
-    int fd;
-    off_t offset;
-    size_t size;
-
-    fd = encoder->next(&offset, &size);
-    CHECK(size > 0);
-
-    ssize_t length = os::sendfile(s, fd, offset, size);
-
-    if (length < 0 && (errno == EINTR)) {
-      // Interrupted, try again now.
-      encoder->backup(size);
-      continue;
-    } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
-      // Might block, try again later.
-      encoder->backup(size);
-      io::poll(s, io::WRITE)
-        .onAny(lambda::bind(&send_file, e));
-      break;
-    } else if (length <= 0) {
-      // Socket error or closed.
-      if (length < 0) {
-        const char* error = strerror(errno);
-        VLOG(1) << "Socket error while sending: " << error;
-      } else {
-        VLOG(1) << "Socket closed while sending";
-      }
-      socket_manager->close(s);
-      delete encoder;
-      break;
-    } else {
-      CHECK(length > 0);
-
-      // Update the encoder with the amount sent.
-      encoder->backup(size - length);
-
-      // See if there is any more of the message to send.
-      if (encoder->remaining() == 0) {
-        delete encoder;
-
-        // Check for more stuff to send on socket.
-        Encoder* next = socket_manager->next(s);
-        if (next != NULL) {
-          io::poll(s, io::WRITE)
-            .onAny(lambda::bind(next->sender(), next));
-        }
-        break;
-      }
-    }
-  }
-}
-
-
 namespace internal {
 
 void decode_read(
@@ -1841,9 +1722,7 @@ void send_connect(const Future<Socket>& socket, Message* message)
         data,
         size));
 
-  // Start polling in order to send data.
-  io::poll(socket.get(), io::WRITE)
-    .onAny(lambda::bind(&send_data, encoder));
+  internal::send(encoder, new Socket(socket.get()));
 }
 
 } // namespace internal {


[16/30] mesos git commit: Replaced circular include dependency w/ TODO.

Posted by be...@apache.org.
Replaced circular include dependency w/ TODO.

Review: https://reviews.apache.org/r/28061


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d1d43403
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d1d43403
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d1d43403

Branch: refs/heads/master
Commit: d1d43403b49c4692ca93e545cd94d0c277719b05
Parents: b774ecb
Author: Benjamin Hindman <be...@gmail.com>
Authored: Fri Nov 14 15:56:41 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 16:27:57 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/clock.hpp   | 5 ++++-
 3rdparty/libprocess/include/process/timeout.hpp | 1 -
 2 files changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/d1d43403/3rdparty/libprocess/include/process/clock.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/clock.hpp b/3rdparty/libprocess/include/process/clock.hpp
index db0fb04..1fd418b 100644
--- a/3rdparty/libprocess/include/process/clock.hpp
+++ b/3rdparty/libprocess/include/process/clock.hpp
@@ -4,7 +4,10 @@
 #include <list>
 
 #include <process/time.hpp>
-#include <process/timer.hpp>
+
+// TODO(benh): We should really be including <process/timer.hpp> but
+// there are currently too many circular dependencies in header files
+// that would need to get moved to translation units first.
 
 #include <stout/duration.hpp>
 #include <stout/lambda.hpp>

http://git-wip-us.apache.org/repos/asf/mesos/blob/d1d43403/3rdparty/libprocess/include/process/timeout.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/timeout.hpp b/3rdparty/libprocess/include/process/timeout.hpp
index 0bf63e1..5c46d70 100644
--- a/3rdparty/libprocess/include/process/timeout.hpp
+++ b/3rdparty/libprocess/include/process/timeout.hpp
@@ -2,7 +2,6 @@
 #define __PROCESS_TIMEOUT_HPP__
 
 #include <process/clock.hpp>
-
 #include <process/time.hpp>
 
 #include <stout/duration.hpp>


[25/30] mesos git commit: Add connect() to the Socket interface.

Posted by be...@apache.org.
Add connect() to the Socket interface.

Review: https://reviews.apache.org/r/27958


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4d616f8d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4d616f8d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4d616f8d

Branch: refs/heads/master
Commit: 4d616f8d7194be9b251aa5a3ddc51783c4d4411a
Parents: 702b382
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Nov 15 16:45:49 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 17:38:21 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/socket.hpp |  41 ++++-
 3rdparty/libprocess/src/process.cpp            | 193 +++++++++++---------
 2 files changed, 138 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/4d616f8d/3rdparty/libprocess/include/process/socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/socket.hpp b/3rdparty/libprocess/include/process/socket.hpp
index 9f4302e..fdad91f 100644
--- a/3rdparty/libprocess/include/process/socket.hpp
+++ b/3rdparty/libprocess/include/process/socket.hpp
@@ -3,8 +3,12 @@
 
 #include <assert.h>
 
+#include <memory>
+
+#include <process/future.hpp>
+#include <process/node.hpp>
+
 #include <stout/abort.hpp>
-#include <stout/memory.hpp>
 #include <stout/nothing.hpp>
 #include <stout/os.hpp>
 #include <stout/try.hpp>
@@ -14,7 +18,8 @@ namespace process {
 
 // Returns a socket fd for the specified options. Note that on OS X,
 // the returned socket will have the SO_NOSIGPIPE option set.
-inline Try<int> socket(int family, int type, int protocol) {
+inline Try<int> socket(int family, int type, int protocol)
+{
   int s;
   if ((s = ::socket(family, type, protocol)) == -1) {
     return ErrnoError();
@@ -41,7 +46,12 @@ inline Try<int> socket(int family, int type, int protocol) {
 class Socket
 {
 public:
-  class Impl
+  // Each socket is a reference counted, shared by default, concurrent
+  // object. However, since we want to support multiple
+  // implementations we use the Pimpl pattern (often called the
+  // compilation firewall) rather than forcing each Socket
+  // implementation to do this themselves.
+  class Impl : public std::enable_shared_from_this<Impl>
   {
   public:
     Impl() : s(-1) {}
@@ -53,8 +63,8 @@ public:
       if (s >= 0) {
         Try<Nothing> close = os::close(s);
         if (close.isError()) {
-          ABORT(
-            "Failed to close socket " + stringify(s) + ": " + close.error());
+          ABORT("Failed to close socket " +
+                stringify(s) + ": " + close.error());
         }
       }
     }
@@ -64,6 +74,8 @@ public:
       return s >= 0 ? s : create().get();
     }
 
+    Future<Socket> connect(const Node& node);
+
   private:
     const Impl& create() const
     {
@@ -78,6 +90,11 @@ public:
     }
 
     // Mutable so that the socket can be lazily created.
+    //
+    // TODO(benh): Create a factory for sockets and don't lazily
+    // create but instead return a Try<Socket> from the factory in
+    // order to eliminate the need for a mutable member or the call to
+    // ABORT above.
     mutable int s;
   };
 
@@ -95,8 +112,20 @@ public:
     return impl->get();
   }
 
+  int get() const
+  {
+    return impl->get();
+  }
+
+  Future<Socket> connect(const Node& node)
+  {
+    return impl->connect(node);
+  }
+
 private:
-  memory::shared_ptr<Impl> impl;
+  explicit Socket(std::shared_ptr<Impl>&& that) : impl(std::move(that)) {}
+
+  std::shared_ptr<Impl> impl;
 };
 
 } // namespace process {

http://git-wip-us.apache.org/repos/asf/mesos/blob/4d616f8d/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 48e5486..6916cbb 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -1484,6 +1484,48 @@ void HttpProxy::stream(const Future<short>& poll, const Request& request)
 }
 
 
+namespace internal {
+
+Future<Socket> connect(const Socket& socket)
+{
+  // Now check that a successful connection was made.
+  int opt;
+  socklen_t optlen = sizeof(opt);
+  int s = socket.get();
+
+  if (getsockopt(s, SOL_SOCKET, SO_ERROR, &opt, &optlen) < 0 || opt != 0) {
+    // Connect failure.
+    VLOG(1) << "Socket error while connecting";
+    return Failure("Socket error while connecting");
+  }
+
+  return socket;
+}
+
+} // namespace internal {
+
+
+Future<Socket> Socket::Impl::connect(const Node& node)
+{
+  sockaddr_in addr;
+  memset(&addr, 0, sizeof(addr));
+  addr.sin_family = PF_INET;
+  addr.sin_port = htons(node.port);
+  addr.sin_addr.s_addr = node.ip;
+
+  if (::connect(get(), (sockaddr*) &addr, sizeof(addr)) < 0) {
+    if (errno != EINPROGRESS) {
+      return Failure(ErrnoError("Failed to connect socket"));
+    }
+
+    return io::poll(get(), io::WRITE)
+      .then(lambda::bind(&internal::connect, Socket(shared_from_this())));
+  }
+
+  return Socket(shared_from_this());
+}
+
+
 SocketManager::SocketManager()
 {
   synchronizer(this) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
@@ -1503,6 +1545,25 @@ Socket SocketManager::accepted(int s)
 }
 
 
+namespace internal {
+
+void link_connect(const Future<Socket>& socket)
+{
+  if (socket.isDiscarded() || socket.isFailed()) {
+    if (socket.isFailed()) {
+      VLOG(1) << "Failed to link, connect: " << socket.failure();
+    }
+    socket_manager->close(socket.get());
+    return;
+  }
+
+  io::poll(socket.get(), io::READ)
+    .onAny(lambda::bind(&ignore_data, new Socket(socket.get()), socket.get()));
+}
+
+} // namespace internal {
+
+
 void SocketManager::link(ProcessBase* process, const UPID& to)
 {
   // TODO(benh): The semantics we want to support for link are such
@@ -1516,58 +1577,22 @@ void SocketManager::link(ProcessBase* process, const UPID& to)
   CHECK(process != NULL);
 
   synchronized (this) {
+    links[to].insert(process);
+
     // Check if node is remote and there isn't a persistant link.
     if (to.node != __node__  && persists.count(to.node) == 0) {
       // Okay, no link, let's create a socket.
-      Try<int> socket = process::socket(AF_INET, SOCK_STREAM, 0);
-      if (socket.isError()) {
-        LOG(FATAL) << "Failed to link, socket: " << socket.error();
-      }
-
-      int s = socket.get();
-
-      Try<Nothing> nonblock = os::nonblock(s);
-      if (nonblock.isError()) {
-        LOG(FATAL) << "Failed to link, nonblock: " << nonblock.error();
-      }
-
-      Try<Nothing> cloexec = os::cloexec(s);
-      if (cloexec.isError()) {
-        LOG(FATAL) << "Failed to link, cloexec: " << cloexec.error();
-      }
+      Socket socket;
+      int s = socket;
 
-      sockets[s] = Socket(s);
+      sockets[s] = socket;
       nodes[s] = to.node;
 
       persists[to.node] = s;
 
-      // Try and connect to the node using this socket in order to
-      // start reading data. Note that we don't expect to receive
-      // anything other than HTTP '202 Accepted' responses which we
-      // anyway ignore.  We do, however, want to react when it gets
-      // closed so we can generate appropriate lost events (since this
-      // is a 'link').
-      sockaddr_in addr;
-      memset(&addr, 0, sizeof(addr));
-      addr.sin_family = PF_INET;
-      addr.sin_port = htons(to.node.port);
-      addr.sin_addr.s_addr = to.node.ip;
-
-      if (connect(s, (sockaddr*) &addr, sizeof(addr)) < 0) {
-        if (errno != EINPROGRESS) {
-          PLOG(FATAL) << "Failed to link, connect";
-        }
-
-        // Wait for socket to be connected.
-        io::poll(s, io::WRITE)
-          .onAny(lambda::bind(&receiving_connect, new Socket(sockets[s]), s));
-      } else {
-        io::poll(s, io::READ)
-          .onAny(lambda::bind(&ignore_data, new Socket(sockets[s]), s));
-      }
+      socket.connect(to.node)
+        .onAny(lambda::bind(&internal::link_connect, lambda::_1));
     }
-
-    links[to].insert(process);
   }
 }
 
@@ -1653,11 +1678,40 @@ void SocketManager::send(
 }
 
 
+namespace internal {
+
+void send_connect(const Future<Socket>& socket, Message* message)
+{
+  if (socket.isDiscarded() || socket.isFailed()) {
+    if (socket.isFailed()) {
+      VLOG(1) << "Failed to send, connect: " << socket.failure();
+    }
+    socket_manager->close(socket.get());
+    delete message;
+    return;
+  }
+
+  Encoder* encoder = new MessageEncoder(socket.get(), message);
+
+  // Read and ignore data from this socket. Note that we don't
+  // expect to receive anything other than HTTP '202 Accepted'
+  // responses which we just ignore.
+  io::poll(socket.get(), io::READ)
+    .onAny(lambda::bind(&ignore_data, new Socket(socket.get()), socket.get()));
+
+  // Start polling in order to send data.
+  io::poll(socket.get(), io::WRITE)
+    .onAny(lambda::bind(&send_data, encoder));
+}
+
+} // namespace internal {
+
+
 void SocketManager::send(Message* message)
 {
   CHECK(message != NULL);
 
-  Node node(message->to.node);
+  const Node& node = message->to.node;
 
   synchronized (this) {
     // Check if there is already a socket.
@@ -1670,24 +1724,10 @@ void SocketManager::send(Message* message)
     } else {
       // No peristent or temporary socket to the node currently
       // exists, so we create a temporary one.
-      Try<int> socket = process::socket(AF_INET, SOCK_STREAM, 0);
-      if (socket.isError()) {
-        LOG(FATAL) << "Failed to send, socket: " << socket.error();
-      }
-
-      int s = socket.get();
+      Socket socket;
+      int s = socket;
 
-      Try<Nothing> nonblock = os::nonblock(s);
-      if (nonblock.isError()) {
-        LOG(FATAL) << "Failed to send, nonblock: " << nonblock.error();
-      }
-
-      Try<Nothing> cloexec = os::cloexec(s);
-      if (cloexec.isError()) {
-        LOG(FATAL) << "Failed to send, cloexec: " << cloexec.error();
-      }
-
-      sockets[s] = Socket(s);
+      sockets[s] = socket;
       nodes[s] = node;
       temps[node] = s;
 
@@ -1696,35 +1736,8 @@ void SocketManager::send(Message* message)
       // Initialize the outgoing queue.
       outgoing[s];
 
-      // Create a message encoder to handle sending this message.
-      Encoder* encoder = new MessageEncoder(sockets[s], message);
-
-      // Read and ignore data from this socket. Note that we don't
-      // expect to receive anything other than HTTP '202 Accepted'
-      // responses which we just ignore.
-      io::poll(s, io::READ)
-        .onAny(lambda::bind(&ignore_data, new Socket(sockets[s]), s));
-
-      // Try and connect to the node using this socket.
-      sockaddr_in addr;
-      memset(&addr, 0, sizeof(addr));
-      addr.sin_family = PF_INET;
-      addr.sin_port = htons(node.port);
-      addr.sin_addr.s_addr = node.ip;
-
-      if (connect(s, (sockaddr*) &addr, sizeof(addr)) < 0) {
-        if (errno != EINPROGRESS) {
-          PLOG(FATAL) << "Failed to send, connect";
-        }
-
-        // Start polling in order to wait for being connected.
-        io::poll(s, io::WRITE)
-          .onAny(lambda::bind(&sending_connect, encoder));
-      } else {
-        // Start polling in order to send data.
-        io::poll(s, io::WRITE)
-          .onAny(lambda::bind(&send_data, encoder));
-      }
+      socket.connect(node)
+        .onAny(lambda::bind(&internal::send_connect, lambda::_1, message));
     }
   }
 }


[12/30] mesos git commit: Moved Clock implementation into clock.cpp.

Posted by be...@apache.org.
Moved Clock implementation into clock.cpp.

Review: https://reviews.apache.org/r/27503


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0e19796d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0e19796d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0e19796d

Branch: refs/heads/master
Commit: 0e19796dec8db07e2a60df15b88970dc387fbe21
Parents: 18cc45f
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sun Nov 2 17:04:25 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 16:25:58 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/Makefile.am     |   1 +
 3rdparty/libprocess/src/clock.cpp   | 437 +++++++++++++++++++++++++++++++
 3rdparty/libprocess/src/process.cpp | 410 +----------------------------
 3 files changed, 444 insertions(+), 404 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/0e19796d/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index a55d562..0008e68 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -30,6 +30,7 @@ PICOJSON = 3rdparty/picojson-$(PICOJSON_VERSION)
 noinst_LTLIBRARIES = libprocess.la
 
 libprocess_la_SOURCES =		\
+  src/clock.cpp			\
   src/config.hpp		\
   src/decoder.hpp		\
   src/encoder.hpp		\

http://git-wip-us.apache.org/repos/asf/mesos/blob/0e19796d/3rdparty/libprocess/src/clock.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/clock.cpp b/3rdparty/libprocess/src/clock.cpp
new file mode 100644
index 0000000..5623b58
--- /dev/null
+++ b/3rdparty/libprocess/src/clock.cpp
@@ -0,0 +1,437 @@
+#include <ev.h>
+
+#include <glog/logging.h>
+
+#include <list>
+#include <map>
+
+#include <process/clock.hpp>
+#include <process/pid.hpp>
+#include <process/process.hpp>
+#include <process/time.hpp>
+#include <process/timeout.hpp>
+
+#include <stout/duration.hpp>
+#include <stout/foreach.hpp>
+#include <stout/lambda.hpp>
+#include <stout/try.hpp>
+
+#include "synchronized.hpp"
+
+using std::list;
+using std::map;
+
+namespace process {
+
+// Event loop.
+extern struct ev_loop* loop;
+
+// Asynchronous watcher for interrupting loop to specifically deal
+// with updating timers.
+static ev_async async_update_timer_watcher;
+
+// Watcher for timeouts.
+static ev_timer timeouts_watcher;
+
+// We store the timers in a map of lists indexed by the timeout of the
+// timer so that we can have two timers that have the same timeout. We
+// exploit that the map is SORTED!
+static map<Time, list<Timer>>* timeouts = new map<Time, list<Timer>>();
+static synchronizable(timeouts) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
+
+// Flag to indicate whether or to update the timer on async interrupt.
+static bool update_timer = false;
+
+
+// We namespace the clock related variables to keep them well
+// named. In the future we'll probably want to associate a clock with
+// a specific ProcessManager/SocketManager instance pair, so this will
+// likely change.
+namespace clock {
+
+map<ProcessBase*, Time>* currents = new map<ProcessBase*, Time>();
+
+// TODO(dhamon): These static non-POD instances should be replaced by pointers
+// or functions.
+Time initial = Time::epoch();
+Time current = Time::epoch();
+
+Duration advanced = Duration::zero();
+
+bool paused = false;
+
+// For supporting Clock::settled(), false if we're not currently
+// settling (or we're not paused), true if we're currently attempting
+// to settle (and we're paused).
+bool settling = false;
+
+// Lambda function to invoke when timers have expired.
+lambda::function<void(list<Timer>&&)> callback;
+
+} // namespace clock {
+
+
+void handle_async_update_timer(struct ev_loop* loop, ev_async* _, int revents)
+{
+  synchronized (timeouts) {
+    if (update_timer) {
+      if (!timeouts->empty()) {
+        // Determine when the next timer should fire.
+        timeouts_watcher.repeat =
+          (timeouts->begin()->first - Clock::now()).secs();
+
+        if (timeouts_watcher.repeat <= 0) {
+          // Feed the event now!
+          timeouts_watcher.repeat = 0;
+          ev_timer_again(loop, &timeouts_watcher);
+          ev_feed_event(loop, &timeouts_watcher, EV_TIMEOUT);
+        } else {
+          // Don't fire the timer if the clock is paused since we
+          // don't want time to advance (instead a call to
+          // clock::advance() will handle the timer).
+          if (Clock::paused() && timeouts_watcher.repeat > 0) {
+            timeouts_watcher.repeat = 0;
+          }
+
+          ev_timer_again(loop, &timeouts_watcher);
+        }
+      }
+
+      update_timer = false;
+    }
+  }
+}
+
+
+void handle_timeouts(struct ev_loop* loop, ev_timer* _, int revents)
+{
+  list<Timer> timers;
+
+  synchronized (timeouts) {
+    Time now = Clock::now();
+
+    VLOG(3) << "Handling timeouts up to " << now;
+
+    foreachkey (const Time& timeout, *timeouts) {
+      if (timeout > now) {
+        break;
+      }
+
+      VLOG(3) << "Have timeout(s) at " << timeout;
+
+      // Need to toggle 'settling' so that we don't prematurely say
+      // we're settled until after the timers are executed below,
+      // outside of the critical section.
+      if (clock::paused) {
+        clock::settling = true;
+      }
+
+      foreach (const Timer& timer, (*timeouts)[timeout]) {
+        timers.push_back(timer);
+      }
+    }
+
+    // Now erase the range of timeouts that timed out.
+    timeouts->erase(timeouts->begin(), timeouts->upper_bound(now));
+
+    // Okay, so the timeout for the next timer should not have fired.
+    CHECK(timeouts->empty() || (timeouts->begin()->first > now));
+
+    // Update the timer as necessary.
+    if (!timeouts->empty()) {
+      // Determine when the next timer should fire.
+      timeouts_watcher.repeat =
+        (timeouts->begin()->first - Clock::now()).secs();
+
+      if (timeouts_watcher.repeat <= 0) {
+        // Feed the event now!
+        timeouts_watcher.repeat = 0;
+        ev_timer_again(loop, &timeouts_watcher);
+        ev_feed_event(loop, &timeouts_watcher, EV_TIMEOUT);
+      } else {
+        // Don't fire the timer if the clock is paused since we don't
+        // want time to advance (instead a call to Clock::advance()
+        // will handle the timer).
+        if (Clock::paused() && timeouts_watcher.repeat > 0) {
+          timeouts_watcher.repeat = 0;
+        }
+
+        ev_timer_again(loop, &timeouts_watcher);
+      }
+    }
+
+    update_timer = false; // Since we might have a queued update_timer.
+  }
+
+  clock::callback(std::move(timers));
+
+  // Mark 'settling' as false since there are not any more timeouts
+  // that will expire before the paused time and we've finished
+  // executing expired timers.
+  synchronized (timeouts) {
+    if (clock::paused &&
+        (timeouts->size() == 0 ||
+         timeouts->begin()->first > clock::current)) {
+      VLOG(3) << "Clock has settled";
+      clock::settling = false;
+    }
+  }
+}
+
+
+void Clock::initialize(lambda::function<void(list<Timer>&&)>&& callback)
+{
+  // TODO(benh): Currently this function is expected to get called
+  // just after initializing libev in process::initialize. But that is
+  // too tightly coupled so and we really need to move libev specific
+  // intialization outside of process::initialize that both
+  // process::initialize and Clock::initialize can depend on (and thus
+  // call).
+
+  clock::callback = callback;
+
+  ev_async_init(&async_update_timer_watcher, handle_async_update_timer);
+  ev_async_start(loop, &async_update_timer_watcher);
+
+  ev_timer_init(&timeouts_watcher, handle_timeouts, 0., 2100000.0);
+  ev_timer_again(loop, &timeouts_watcher);
+}
+
+
+Time Clock::now()
+{
+  return now(__process__);
+}
+
+
+Time Clock::now(ProcessBase* process)
+{
+  synchronized (timeouts) {
+    if (Clock::paused()) {
+      if (process != NULL) {
+        if (clock::currents->count(process) != 0) {
+          return (*clock::currents)[process];
+        } else {
+          return (*clock::currents)[process] = clock::initial;
+        }
+      } else {
+        return clock::current;
+      }
+    }
+  }
+
+  // TODO(benh): Versus ev_now()?
+  double d = ev_time();
+  Try<Time> time = Time::create(d); // Compensates for clock::advanced.
+
+  // TODO(xujyan): Move CHECK_SOME to libprocess and add CHECK_SOME
+  // here.
+  if (time.isError()) {
+    LOG(FATAL) << "Failed to create a Time from " << d << ": "
+               << time.error();
+  }
+  return time.get();
+}
+
+
+Timer Clock::timer(
+    const Duration& duration,
+    const lambda::function<void(void)>& thunk)
+{
+  static uint64_t id = 1; // Start at 1 since Timer() instances use id 0.
+
+  // Assumes Clock::now() does Clock::now(__process__).
+  Timeout timeout = Timeout::in(duration);
+
+  UPID pid = __process__ != NULL ? __process__->self() : UPID();
+
+  Timer timer(__sync_fetch_and_add(&id, 1), timeout, pid, thunk);
+
+  VLOG(3) << "Created a timer for " << pid << " in " << stringify(duration)
+          << " in the future (" << timeout.time() << ")";
+
+  // Add the timer.
+  synchronized (timeouts) {
+    if (timeouts->size() == 0 ||
+        timer.timeout().time() < timeouts->begin()->first) {
+      // Need to interrupt the loop to update/set timer repeat.
+      (*timeouts)[timer.timeout().time()].push_back(timer);
+      update_timer = true;
+      ev_async_send(loop, &async_update_timer_watcher);
+    } else {
+      // Timer repeat is adequate, just add the timeout.
+      CHECK(timeouts->size() >= 1);
+      (*timeouts)[timer.timeout().time()].push_back(timer);
+    }
+  }
+
+  return timer;
+}
+
+
+bool Clock::cancel(const Timer& timer)
+{
+  bool canceled = false;
+  synchronized (timeouts) {
+    // Check if the timeout is still pending, and if so, erase it. In
+    // addition, erase an empty list if we just removed the last
+    // timeout.
+    Time time = timer.timeout().time();
+    if (timeouts->count(time) > 0) {
+      canceled = true;
+      (*timeouts)[time].remove(timer);
+      if ((*timeouts)[time].empty()) {
+        timeouts->erase(time);
+      }
+    }
+  }
+
+  return canceled;
+}
+
+
+void Clock::pause()
+{
+  process::initialize(); // To make sure the libev watchers are ready.
+
+  synchronized (timeouts) {
+    if (!clock::paused) {
+      clock::initial = clock::current = now();
+      clock::paused = true;
+      VLOG(2) << "Clock paused at " << clock::initial;
+    }
+  }
+
+  // Note that after pausing the clock an existing libev timer might
+  // still fire (invoking handle_timeout), but since paused == true no
+  // "time" will actually have passed, so no timer will actually fire.
+}
+
+
+bool Clock::paused()
+{
+  return clock::paused;
+}
+
+
+void Clock::resume()
+{
+  process::initialize(); // To make sure the libev watchers are ready.
+
+  synchronized (timeouts) {
+    if (clock::paused) {
+      VLOG(2) << "Clock resumed at " << clock::current;
+      clock::paused = false;
+      clock::settling = false;
+      clock::currents->clear();
+      update_timer = true;
+      ev_async_send(loop, &async_update_timer_watcher);
+    }
+  }
+}
+
+
+void Clock::advance(const Duration& duration)
+{
+  synchronized (timeouts) {
+    if (clock::paused) {
+      clock::advanced += duration;
+      clock::current += duration;
+      VLOG(2) << "Clock advanced ("  << duration << ") to " << clock::current;
+      if (!update_timer) {
+        update_timer = true;
+        ev_async_send(loop, &async_update_timer_watcher);
+      }
+    }
+  }
+}
+
+
+void Clock::advance(ProcessBase* process, const Duration& duration)
+{
+  synchronized (timeouts) {
+    if (clock::paused) {
+      Time current = now(process);
+      current += duration;
+      (*clock::currents)[process] = current;
+      VLOG(2) << "Clock of " << process->self() << " advanced (" << duration
+              << ") to " << current;
+    }
+  }
+}
+
+
+void Clock::update(const Time& time)
+{
+  synchronized (timeouts) {
+    if (clock::paused) {
+      if (clock::current < time) {
+        clock::advanced += (time - clock::current);
+        clock::current = Time(time);
+        VLOG(2) << "Clock updated to " << clock::current;
+        if (!update_timer) {
+          update_timer = true;
+          ev_async_send(loop, &async_update_timer_watcher);
+        }
+      }
+    }
+  }
+}
+
+
+void Clock::update(ProcessBase* process, const Time& time, Update update)
+{
+  synchronized (timeouts) {
+    if (clock::paused) {
+      if (now(process) < time || update == Clock::FORCE) {
+        VLOG(2) << "Clock of " << process->self() << " updated to " << time;
+        (*clock::currents)[process] = Time(time);
+      }
+    }
+  }
+}
+
+
+void Clock::order(ProcessBase* from, ProcessBase* to)
+{
+  VLOG(2) << "Clock of " << to->self() << " being updated to " << from->self();
+  update(to, now(from));
+}
+
+
+bool Clock::settled()
+{
+  synchronized (timeouts) {
+    CHECK(clock::paused);
+
+    if (update_timer) {
+      return false;
+    } else if (clock::settling) {
+      VLOG(3) << "Clock still not settled";
+      return false;
+    } else if (timeouts->size() == 0 ||
+               timeouts->begin()->first > clock::current) {
+      VLOG(3) << "Clock is settled";
+      return true;
+    }
+
+    VLOG(3) << "Clock is not settled";
+    return false;
+  }
+}
+
+
+// TODO(benh): Introduce a Clock::time(seconds) that replaces this
+// function for getting a 'Time' value.
+Try<Time> Time::create(double seconds)
+{
+  Try<Duration> duration = Duration::create(seconds);
+  if (duration.isSome()) {
+    // In production code, clock::advanced will always be zero!
+    return Time(duration.get() + clock::advanced);
+  } else {
+    return Error("Argument too large for Time: " + duration.error());
+  }
+}
+
+} // namespace process {

http://git-wip-us.apache.org/repos/asf/mesos/blob/0e19796d/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index a0b4ca0..bac4200 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -447,19 +447,12 @@ static SocketManager* socket_manager = NULL;
 static ProcessManager* process_manager = NULL;
 
 // Event loop.
-static struct ev_loop* loop = NULL;
+struct ev_loop* loop = NULL;
 
 // Asynchronous watcher for interrupting loop to specifically deal
 // with IO watchers and functions (via run_in_event_loop).
 static ev_async async_watcher;
 
-// Asynchronous watcher for interrupting loop to specifically deal
-// with updating timers.
-static ev_async async_update_timer_watcher;
-
-// Watcher for timeouts.
-static ev_timer timeouts_watcher;
-
 // Server watcher for accepting connections.
 static ev_io server_watcher;
 
@@ -475,15 +468,6 @@ static synchronizable(watchers) = SYNCHRONIZED_INITIALIZER;
 static queue<lambda::function<void(void)> >* functions =
   new queue<lambda::function<void(void)> >();
 
-// We store the timers in a map of lists indexed by the timeout of the
-// timer so that we can have two timers that have the same timeout. We
-// exploit that the map is SORTED!
-static map<Time, list<Timer> >* timeouts = new map<Time, list<Timer> >();
-static synchronizable(timeouts) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
-
-// Flag to indicate whether or to update the timer on async interrupt.
-static bool update_timer = false;
-
 // Scheduling gate that threads wait at when there is nothing to run.
 static Gate* gate = new Gate();
 
@@ -509,227 +493,15 @@ ThreadLocal<Executor>* _executor_ = new ThreadLocal<Executor>();
 // const Duration LIBPROCESS_STATISTICS_WINDOW = Days(1);
 
 
-// We namespace the clock related variables to keep them well
-// named. In the future we'll probably want to associate a clock with
-// a specific ProcessManager/SocketManager instance pair, so this will
-// likely change.
-namespace clock {
-
-map<ProcessBase*, Time>* currents = new map<ProcessBase*, Time>();
-
-// TODO(dhamon): These static non-POD instances should be replaced by pointers
-// or functions.
-Time initial = Time::epoch();
-Time current = Time::epoch();
-
-Duration advanced = Duration::zero();
-
-bool paused = false;
-
-// For supporting Clock::settled(), false if we're not currently
-// settling (or we're not paused), true if we're currently attempting
-// to settle (and we're paused).
-bool settling = false;
-
-// Lambda function to invoke when timers have expired.
-lambda::function<void(list<Timer>&&)> callback;
-
-} // namespace clock {
-
-
-void Clock::initialize(lambda::function<void(list<Timer>&&)>&& callback)
-{
-  clock::callback = callback;
-}
-
-
-Time Clock::now()
-{
-  return now(__process__);
-}
-
-
-Time Clock::now(ProcessBase* process)
-{
-  synchronized (timeouts) {
-    if (Clock::paused()) {
-      if (process != NULL) {
-        if (clock::currents->count(process) != 0) {
-          return (*clock::currents)[process];
-        } else {
-          return (*clock::currents)[process] = clock::initial;
-        }
-      } else {
-        return clock::current;
-      }
-    }
-  }
-
-  // TODO(benh): Versus ev_now()?
-  double d = ev_time();
-  Try<Time> time = Time::create(d); // Compensates for clock::advanced.
-
-  // TODO(xujyan): Move CHECK_SOME to libprocess and add CHECK_SOME
-  // here.
-  if (time.isError()) {
-    LOG(FATAL) << "Failed to create a Time from " << d << ": "
-               << time.error();
-  }
-  return time.get();
-}
-
-
-void Clock::pause()
-{
-  process::initialize(); // To make sure the libev watchers are ready.
-
-  synchronized (timeouts) {
-    if (!clock::paused) {
-      clock::initial = clock::current = now();
-      clock::paused = true;
-      VLOG(2) << "Clock paused at " << clock::initial;
-    }
-  }
-
-  // Note that after pausing the clock an existing libev timer might
-  // still fire (invoking handle_timeout), but since paused == true no
-  // "time" will actually have passed, so no timer will actually fire.
-}
-
-
-bool Clock::paused()
-{
-  return clock::paused;
-}
-
-
-void Clock::resume()
-{
-  process::initialize(); // To make sure the libev watchers are ready.
-
-  synchronized (timeouts) {
-    if (clock::paused) {
-      VLOG(2) << "Clock resumed at " << clock::current;
-      clock::paused = false;
-      clock::settling = false;
-      clock::currents->clear();
-      update_timer = true;
-      ev_async_send(loop, &async_update_timer_watcher);
-    }
-  }
-}
-
-
-void Clock::advance(const Duration& duration)
-{
-  synchronized (timeouts) {
-    if (clock::paused) {
-      clock::advanced += duration;
-      clock::current += duration;
-      VLOG(2) << "Clock advanced ("  << duration << ") to " << clock::current;
-      if (!update_timer) {
-        update_timer = true;
-        ev_async_send(loop, &async_update_timer_watcher);
-      }
-    }
-  }
-}
-
-
-void Clock::advance(ProcessBase* process, const Duration& duration)
-{
-  synchronized (timeouts) {
-    if (clock::paused) {
-      Time current = now(process);
-      current += duration;
-      (*clock::currents)[process] = current;
-      VLOG(2) << "Clock of " << process->self() << " advanced (" << duration
-              << ") to " << current;
-    }
-  }
-}
-
-
-void Clock::update(const Time& time)
-{
-  synchronized (timeouts) {
-    if (clock::paused) {
-      if (clock::current < time) {
-        clock::advanced += (time - clock::current);
-        clock::current = Time(time);
-        VLOG(2) << "Clock updated to " << clock::current;
-        if (!update_timer) {
-          update_timer = true;
-          ev_async_send(loop, &async_update_timer_watcher);
-        }
-      }
-    }
-  }
-}
-
-
-void Clock::update(ProcessBase* process, const Time& time, Update update)
-{
-  synchronized (timeouts) {
-    if (clock::paused) {
-      if (now(process) < time || update == Clock::FORCE) {
-        VLOG(2) << "Clock of " << process->self() << " updated to " << time;
-        (*clock::currents)[process] = Time(time);
-      }
-    }
-  }
-}
-
-
-void Clock::order(ProcessBase* from, ProcessBase* to)
-{
-  VLOG(2) << "Clock of " << to->self() << " being updated to " << from->self();
-  update(to, now(from));
-}
-
-
+// NOTE: Clock::* implementations are in clock.cpp except for
+// Clock::settle which currently has a dependency on
+// 'process_manager'.
 void Clock::settle()
 {
-  CHECK(clock::paused); // TODO(benh): Consider returning a bool instead.
   process_manager->settle();
 }
 
 
-bool Clock::settled()
-{
-  synchronized (timeouts) {
-    CHECK(clock::paused);
-
-    if (update_timer) {
-      return false;
-    } else if (clock::settling) {
-      VLOG(3) << "Clock still not settled";
-      return false;
-    } else if (timeouts->size() == 0 ||
-               timeouts->begin()->first > clock::current) {
-      VLOG(3) << "Clock is settled";
-      return true;
-    }
-
-    VLOG(3) << "Clock is not settled";
-
-    return false;
-  }
-}
-
-
-Try<Time> Time::create(double seconds)
-{
-  Try<Duration> duration = Duration::create(seconds);
-  if (duration.isSome()) {
-    // In production code, clock::advanced will always be zero!
-    return Time(duration.get() + clock::advanced);
-  } else {
-    return Error("Argument too large for Time: " + duration.error());
-  }
-}
-
-
 static Message* encode(const UPID& from,
                        const UPID& to,
                        const string& name,
@@ -874,114 +646,6 @@ void handle_async(struct ev_loop* loop, ev_async* _, int revents)
 }
 
 
-void handle_async_update_timer(struct ev_loop* loop, ev_async* _, int revents)
-{
-  synchronized (timeouts) {
-    if (update_timer) {
-      if (!timeouts->empty()) {
-        // Determine when the next timer should fire.
-        timeouts_watcher.repeat =
-          (timeouts->begin()->first - Clock::now()).secs();
-
-        if (timeouts_watcher.repeat <= 0) {
-          // Feed the event now!
-          timeouts_watcher.repeat = 0;
-          ev_timer_again(loop, &timeouts_watcher);
-          ev_feed_event(loop, &timeouts_watcher, EV_TIMEOUT);
-        } else {
-          // Don't fire the timer if the clock is paused since we
-          // don't want time to advance (instead a call to
-          // clock::advance() will handle the timer).
-          if (Clock::paused() && timeouts_watcher.repeat > 0) {
-            timeouts_watcher.repeat = 0;
-          }
-
-          ev_timer_again(loop, &timeouts_watcher);
-        }
-      }
-
-      update_timer = false;
-    }
-  }
-}
-
-
-void handle_timeouts(struct ev_loop* loop, ev_timer* _, int revents)
-{
-  list<Timer> timers;
-
-  synchronized (timeouts) {
-    Time now = Clock::now();
-
-    VLOG(3) << "Handling timeouts up to " << now;
-
-    foreachkey (const Time& timeout, *timeouts) {
-      if (timeout > now) {
-        break;
-      }
-
-      VLOG(3) << "Have timeout(s) at " << timeout;
-
-      // Need to toggle 'settling' so that we don't prematurely say
-      // we're settled until after the timers are executed below,
-      // outside of the critical section.
-      if (clock::paused) {
-        clock::settling = true;
-      }
-
-      foreach (const Timer& timer, (*timeouts)[timeout]) {
-        timers.push_back(timer);
-      }
-    }
-
-    // Now erase the range of timeouts that timed out.
-    timeouts->erase(timeouts->begin(), timeouts->upper_bound(now));
-
-    // Okay, so the timeout for the next timer should not have fired.
-    CHECK(timeouts->empty() || (timeouts->begin()->first > now));
-
-    // Update the timer as necessary.
-    if (!timeouts->empty()) {
-      // Determine when the next timer should fire.
-      timeouts_watcher.repeat =
-        (timeouts->begin()->first - Clock::now()).secs();
-
-      if (timeouts_watcher.repeat <= 0) {
-        // Feed the event now!
-        timeouts_watcher.repeat = 0;
-        ev_timer_again(loop, &timeouts_watcher);
-        ev_feed_event(loop, &timeouts_watcher, EV_TIMEOUT);
-      } else {
-        // Don't fire the timer if the clock is paused since we don't
-        // want time to advance (instead a call to Clock::advance()
-        // will handle the timer).
-        if (Clock::paused() && timeouts_watcher.repeat > 0) {
-          timeouts_watcher.repeat = 0;
-        }
-
-        ev_timer_again(loop, &timeouts_watcher);
-      }
-    }
-
-    update_timer = false; // Since we might have a queued update_timer.
-  }
-
-  clock::callback(std::move(timers));
-
-  // Mark 'settling' as false since there are not any more timeouts
-  // that will expire before the paused time and we've finished
-  // executing expired timers.
-  synchronized (timeouts) {
-    if (clock::paused &&
-        (timeouts->size() == 0 ||
-         timeouts->begin()->first > clock::current)) {
-      VLOG(3) << "Clock has settled";
-      clock::settling = false;
-    }
-  }
-}
-
-
 void recv_data(struct ev_loop* loop, ev_io* watcher, int revents)
 {
   DataDecoder* decoder = (DataDecoder*) watcher->data;
@@ -1501,8 +1165,6 @@ void initialize(const string& delegate)
   process_manager = new ProcessManager(delegate);
   socket_manager = new SocketManager();
 
-  Clock::initialize(lambda::bind(&timedout, lambda::_1));
-
   // Setup processing threads.
   // We create no fewer than 8 threads because some tests require
   // more worker threads than 'sysconf(_SC_NPROCESSORS_ONLN)' on
@@ -1628,15 +1290,11 @@ void initialize(const string& delegate)
   ev_async_init(&async_watcher, handle_async);
   ev_async_start(loop, &async_watcher);
 
-  ev_async_init(&async_update_timer_watcher, handle_async_update_timer);
-  ev_async_start(loop, &async_update_timer_watcher);
-
-  ev_timer_init(&timeouts_watcher, handle_timeouts, 0., 2100000.0);
-  ev_timer_again(loop, &timeouts_watcher);
-
   ev_io_init(&server_watcher, accept, __s__, EV_READ);
   ev_io_start(loop, &server_watcher);
 
+  Clock::initialize(lambda::bind(&timedout, lambda::_1));
+
 //   ev_child_init(&child_watcher, child_exited, pid, 0);
 //   ev_child_start(loop, &cw);
 
@@ -3215,62 +2873,6 @@ Future<Response> ProcessManager::__processes__(const Request&)
 }
 
 
-Timer Clock::timer(
-    const Duration& duration,
-    const lambda::function<void(void)>& thunk)
-{
-  static uint64_t id = 1; // Start at 1 since Timer() instances use id 0.
-
-  // Assumes Clock::now() does Clock::now(__process__).
-  Timeout timeout = Timeout::in(duration);
-
-  UPID pid = __process__ != NULL ? __process__->self() : UPID();
-
-  Timer timer(__sync_fetch_and_add(&id, 1), timeout, pid, thunk);
-
-  VLOG(3) << "Created a timer for " << pid << " in " << stringify(duration)
-          << " in the future (" << timeout.time() << ")";
-
-  // Add the timer.
-  synchronized (timeouts) {
-    if (timeouts->size() == 0 ||
-        timer.timeout().time() < timeouts->begin()->first) {
-      // Need to interrupt the loop to update/set timer repeat.
-      (*timeouts)[timer.timeout().time()].push_back(timer);
-      update_timer = true;
-      ev_async_send(loop, &async_update_timer_watcher);
-    } else {
-      // Timer repeat is adequate, just add the timeout.
-      CHECK(timeouts->size() >= 1);
-      (*timeouts)[timer.timeout().time()].push_back(timer);
-    }
-  }
-
-  return timer;
-}
-
-
-bool Clock::cancel(const Timer& timer)
-{
-  bool canceled = false;
-  synchronized (timeouts) {
-    // Check if the timeout is still pending, and if so, erase it. In
-    // addition, erase an empty list if we just removed the last
-    // timeout.
-    Time time = timer.timeout().time();
-    if (timeouts->count(time) > 0) {
-      canceled = true;
-      (*timeouts)[time].remove(timer);
-      if ((*timeouts)[time].empty()) {
-        timeouts->erase(time);
-      }
-    }
-  }
-
-  return canceled;
-}
-
-
 ProcessBase::ProcessBase(const string& id)
 {
   process::initialize();


[10/30] mesos git commit: Moved process::io::* into io.cpp.

Posted by be...@apache.org.
Moved process::io::* into io.cpp.

Review: https://reviews.apache.org/r/27506


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/37bba65e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/37bba65e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/37bba65e

Branch: refs/heads/master
Commit: 37bba65e897e9e06640b7f126fe4871ab917f1ce
Parents: 413ce94
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sun Nov 2 18:28:35 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 16:25:58 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/Makefile.am     |   1 +
 3rdparty/libprocess/src/io.cpp      | 647 +++++++++++++++++++++++++++++++
 3rdparty/libprocess/src/process.cpp | 633 ------------------------------
 3 files changed, 648 insertions(+), 633 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/37bba65e/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index 41c3bd1..aebd281 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -37,6 +37,7 @@ libprocess_la_SOURCES =		\
   src/gate.hpp			\
   src/help.cpp			\
   src/http.cpp			\
+  src/io.cpp			\
   src/latch.cpp			\
   src/libev.hpp			\
   src/libev.cpp			\

http://git-wip-us.apache.org/repos/asf/mesos/blob/37bba65e/3rdparty/libprocess/src/io.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/io.cpp b/3rdparty/libprocess/src/io.cpp
new file mode 100644
index 0000000..75fe75c
--- /dev/null
+++ b/3rdparty/libprocess/src/io.cpp
@@ -0,0 +1,647 @@
+#include <string>
+
+#include <boost/shared_array.hpp>
+
+#include <process/future.hpp>
+#include <process/io.hpp>
+#include <process/process.hpp> // For process::initialize.
+
+#include <stout/lambda.hpp>
+#include <stout/memory.hpp>
+#include <stout/nothing.hpp>
+#include <stout/os.hpp>
+#include <stout/try.hpp>
+
+using std::string;
+
+namespace process {
+namespace io {
+namespace internal {
+
+void read(
+    int fd,
+    void* data,
+    size_t size,
+    const memory::shared_ptr<Promise<size_t> >& promise,
+    const Future<short>& future)
+{
+  // Ignore this function if the read operation has been discarded.
+  if (promise->future().hasDiscard()) {
+    CHECK(!future.isPending());
+    promise->discard();
+    return;
+  }
+
+  if (size == 0) {
+    promise->set(0);
+    return;
+  }
+
+  if (future.isDiscarded()) {
+    promise->fail("Failed to poll: discarded future");
+  } else if (future.isFailed()) {
+    promise->fail(future.failure());
+  } else {
+    ssize_t length = ::read(fd, data, size);
+    if (length < 0) {
+      if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
+        // Restart the read operation.
+        Future<short> future =
+          io::poll(fd, process::io::READ).onAny(
+              lambda::bind(&internal::read,
+                           fd,
+                           data,
+                           size,
+                           promise,
+                           lambda::_1));
+
+        // Stop polling if a discard occurs on our future.
+        promise->future().onDiscard(
+            lambda::bind(&process::internal::discard<short>,
+                         WeakFuture<short>(future)));
+      } else {
+        // Error occurred.
+        promise->fail(strerror(errno));
+      }
+    } else {
+      promise->set(length);
+    }
+  }
+}
+
+
+void write(
+    int fd,
+    void* data,
+    size_t size,
+    const memory::shared_ptr<Promise<size_t> >& promise,
+    const Future<short>& future)
+{
+  // Ignore this function if the write operation has been discarded.
+  if (promise->future().hasDiscard()) {
+    promise->discard();
+    return;
+  }
+
+  if (size == 0) {
+    promise->set(0);
+    return;
+  }
+
+  if (future.isDiscarded()) {
+    promise->fail("Failed to poll: discarded future");
+  } else if (future.isFailed()) {
+    promise->fail(future.failure());
+  } else {
+    // Do a write but ignore SIGPIPE so we can return an error when
+    // writing to a pipe or socket where the reading end is closed.
+    // TODO(benh): The 'suppress' macro failed to work on OS X as it
+    // appears that signal delivery was happening asynchronously.
+    // That is, the signal would not appear to be pending when the
+    // 'suppress' block was closed thus the destructor for
+    // 'Suppressor' was not waiting/removing the signal via 'sigwait'.
+    // It also appeared that the signal would be delivered to another
+    // thread even if it remained blocked in this thiread. The
+    // workaround here is to check explicitly for EPIPE and then do
+    // 'sigwait' regardless of what 'os::signals::pending' returns. We
+    // don't have that luxury with 'Suppressor' and arbitrary signals
+    // because we don't always have something like EPIPE to tell us
+    // that a signal is (or will soon be) pending.
+    bool pending = os::signals::pending(SIGPIPE);
+    bool unblock = !pending ? os::signals::block(SIGPIPE) : false;
+
+    ssize_t length = ::write(fd, data, size);
+
+    // Save the errno so we can restore it after doing sig* functions
+    // below.
+    int errno_ = errno;
+
+    if (length < 0 && errno == EPIPE && !pending) {
+      sigset_t mask;
+      sigemptyset(&mask);
+      sigaddset(&mask, SIGPIPE);
+
+      int result;
+      do {
+        int ignored;
+        result = sigwait(&mask, &ignored);
+      } while (result == -1 && errno == EINTR);
+    }
+
+    if (unblock) {
+      os::signals::unblock(SIGPIPE);
+    }
+
+    errno = errno_;
+
+    if (length < 0) {
+      if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
+        // Restart the write operation.
+        Future<short> future =
+          io::poll(fd, process::io::WRITE).onAny(
+              lambda::bind(&internal::write,
+                           fd,
+                           data,
+                           size,
+                           promise,
+                           lambda::_1));
+
+        // Stop polling if a discard occurs on our future.
+        promise->future().onDiscard(
+            lambda::bind(&process::internal::discard<short>,
+                         WeakFuture<short>(future)));
+      } else {
+        // Error occurred.
+        promise->fail(strerror(errno));
+      }
+    } else {
+      // TODO(benh): Retry if 'length' is 0?
+      promise->set(length);
+    }
+  }
+}
+
+} // namespace internal {
+
+
+Future<size_t> read(int fd, void* data, size_t size)
+{
+  process::initialize();
+
+  memory::shared_ptr<Promise<size_t> > promise(new Promise<size_t>());
+
+  // Check the file descriptor.
+  Try<bool> nonblock = os::isNonblock(fd);
+  if (nonblock.isError()) {
+    // The file descriptor is not valid (e.g., has been closed).
+    promise->fail(
+        "Failed to check if file descriptor was non-blocking: " +
+        nonblock.error());
+    return promise->future();
+  } else if (!nonblock.get()) {
+    // The file descriptor is not non-blocking.
+    promise->fail("Expected a non-blocking file descriptor");
+    return promise->future();
+  }
+
+  // Because the file descriptor is non-blocking, we call read()
+  // immediately. The read may in turn call poll if necessary,
+  // avoiding unnecessary polling. We also observed that for some
+  // combination of libev and Linux kernel versions, the poll would
+  // block for non-deterministically long periods of time. This may be
+  // fixed in a newer version of libev (we use 3.8 at the time of
+  // writing this comment).
+  internal::read(fd, data, size, promise, io::READ);
+
+  return promise->future();
+}
+
+
+Future<size_t> write(int fd, void* data, size_t size)
+{
+  process::initialize();
+
+  memory::shared_ptr<Promise<size_t> > promise(new Promise<size_t>());
+
+  // Check the file descriptor.
+  Try<bool> nonblock = os::isNonblock(fd);
+  if (nonblock.isError()) {
+    // The file descriptor is not valid (e.g., has been closed).
+    promise->fail(
+        "Failed to check if file descriptor was non-blocking: " +
+        nonblock.error());
+    return promise->future();
+  } else if (!nonblock.get()) {
+    // The file descriptor is not non-blocking.
+    promise->fail("Expected a non-blocking file descriptor");
+    return promise->future();
+  }
+
+  // Because the file descriptor is non-blocking, we call write()
+  // immediately. The write may in turn call poll if necessary,
+  // avoiding unnecessary polling. We also observed that for some
+  // combination of libev and Linux kernel versions, the poll would
+  // block for non-deterministically long periods of time. This may be
+  // fixed in a newer version of libev (we use 3.8 at the time of
+  // writing this comment).
+  internal::write(fd, data, size, promise, io::WRITE);
+
+  return promise->future();
+}
+
+
+namespace internal {
+
+#if __cplusplus >= 201103L
+Future<string> _read(
+    int fd,
+    const memory::shared_ptr<string>& buffer,
+    const boost::shared_array<char>& data,
+    size_t length)
+{
+  return io::read(fd, data.get(), length)
+    .then([=] (size_t size) -> Future<string> {
+      if (size == 0) { // EOF.
+        return string(*buffer);
+      }
+      buffer->append(data.get(), size);
+      return _read(fd, buffer, data, length);
+    });
+}
+#else
+// Forward declataion.
+Future<string> _read(
+    int fd,
+    const memory::shared_ptr<string>& buffer,
+    const boost::shared_array<char>& data,
+    size_t length);
+
+
+Future<string> __read(
+    size_t size,
+    int fd,
+    const memory::shared_ptr<string>& buffer,
+    const boost::shared_array<char>& data,
+    size_t length)
+{
+  if (size == 0) { // EOF.
+    return string(*buffer);
+  }
+
+  buffer->append(data.get(), size);
+
+  return _read(fd, buffer, data, length);
+}
+
+
+Future<string> _read(
+    int fd,
+    const memory::shared_ptr<string>& buffer,
+    const boost::shared_array<char>& data,
+    size_t length)
+{
+  return io::read(fd, data.get(), length)
+    .then(lambda::bind(&__read, lambda::_1, fd, buffer, data, length));
+}
+#endif // __cplusplus >= 201103L
+
+
+#if __cplusplus >= 201103L
+Future<Nothing> _write(
+    int fd,
+    Owned<string> data,
+    size_t index)
+{
+  return io::write(fd, (void*) (data->data() + index), data->size() - index)
+    .then([=] (size_t length) -> Future<Nothing> {
+      if (index + length == data->size()) {
+        return Nothing();
+      }
+      return _write(fd, data, index + length);
+    });
+}
+#else
+// Forward declaration.
+Future<Nothing> _write(
+    int fd,
+    Owned<string> data,
+    size_t index);
+
+
+Future<Nothing> __write(
+    int fd,
+    Owned<string> data,
+    size_t index,
+    size_t length)
+{
+  if (index + length == data->size()) {
+    return Nothing();
+  }
+  return _write(fd, data, index + length);
+}
+
+
+Future<Nothing> _write(
+    int fd,
+    Owned<string> data,
+    size_t index)
+{
+  return io::write(fd, (void*) (data->data() + index), data->size() - index)
+    .then(lambda::bind(&__write, fd, data, index, lambda::_1));
+}
+#endif // __cplusplus >= 201103L
+
+
+#if __cplusplus >= 201103L
+void _splice(
+    int from,
+    int to,
+    size_t chunk,
+    boost::shared_array<char> data,
+    memory::shared_ptr<Promise<Nothing>> promise)
+{
+  // Stop splicing if a discard occured on our future.
+  if (promise->future().hasDiscard()) {
+    // TODO(benh): Consider returning the number of bytes already
+    // spliced on discarded, or a failure. Same for the 'onDiscarded'
+    // callbacks below.
+    promise->discard();
+    return;
+  }
+
+  // Note that only one of io::read or io::write is outstanding at any
+  // one point in time thus the reuse of 'data' for both operations.
+
+  Future<size_t> read = io::read(from, data.get(), chunk);
+
+  // Stop reading (or potentially indefinitely polling) if a discard
+  // occcurs on our future.
+  promise->future().onDiscard(
+      lambda::bind(&process::internal::discard<size_t>,
+                   WeakFuture<size_t>(read)));
+
+  read
+    .onReady([=] (size_t size) {
+      if (size == 0) { // EOF.
+        promise->set(Nothing());
+      } else {
+        // Note that we always try and complete the write, even if a
+        // discard has occured on our future, in order to provide
+        // semantics where everything read is written. The promise
+        // will eventually be discarded in the next read.
+        io::write(to, string(data.get(), size))
+          .onReady([=] () { _splice(from, to, chunk, data, promise); })
+          .onFailed([=] (const string& message) { promise->fail(message); })
+          .onDiscarded([=] () { promise->discard(); });
+      }
+    })
+    .onFailed([=] (const string& message) { promise->fail(message); })
+    .onDiscarded([=] () { promise->discard(); });
+}
+#else
+// Forward declarations.
+void __splice(
+    int from,
+    int to,
+    size_t chunk,
+    boost::shared_array<char> data,
+    memory::shared_ptr<Promise<Nothing> > promise,
+    size_t size);
+
+void ___splice(
+    memory::shared_ptr<Promise<Nothing> > promise,
+    const string& message);
+
+void ____splice(
+    memory::shared_ptr<Promise<Nothing> > promise);
+
+
+void _splice(
+    int from,
+    int to,
+    size_t chunk,
+    boost::shared_array<char> data,
+    memory::shared_ptr<Promise<Nothing> > promise)
+{
+  // Stop splicing if a discard occured on our future.
+  if (promise->future().hasDiscard()) {
+    // TODO(benh): Consider returning the number of bytes already
+    // spliced on discarded, or a failure. Same for the 'onDiscarded'
+    // callbacks below.
+    promise->discard();
+    return;
+  }
+
+  Future<size_t> read = io::read(from, data.get(), chunk);
+
+  // Stop reading (or potentially indefinitely polling) if a discard
+  // occurs on our future.
+  promise->future().onDiscard(
+      lambda::bind(&process::internal::discard<size_t>,
+                   WeakFuture<size_t>(read)));
+
+  read
+    .onReady(
+        lambda::bind(&__splice, from, to, chunk, data, promise, lambda::_1))
+    .onFailed(lambda::bind(&___splice, promise, lambda::_1))
+    .onDiscarded(lambda::bind(&____splice, promise));
+}
+
+
+void __splice(
+    int from,
+    int to,
+    size_t chunk,
+    boost::shared_array<char> data,
+    memory::shared_ptr<Promise<Nothing> > promise,
+    size_t size)
+{
+  if (size == 0) { // EOF.
+    promise->set(Nothing());
+  } else {
+    // Note that we always try and complete the write, even if a
+    // discard has occured on our future, in order to provide
+    // semantics where everything read is written. The promise will
+    // eventually be discarded in the next read.
+    io::write(to, string(data.get(), size))
+      .onReady(lambda::bind(&_splice, from, to, chunk, data, promise))
+      .onFailed(lambda::bind(&___splice, promise, lambda::_1))
+      .onDiscarded(lambda::bind(&____splice, promise));
+  }
+}
+
+
+void ___splice(
+    memory::shared_ptr<Promise<Nothing> > promise,
+    const string& message)
+{
+  promise->fail(message);
+}
+
+
+void ____splice(
+    memory::shared_ptr<Promise<Nothing> > promise)
+{
+  promise->discard();
+}
+#endif // __cplusplus >= 201103L
+
+
+Future<Nothing> splice(int from, int to, size_t chunk)
+{
+  boost::shared_array<char> data(new char[chunk]);
+
+  // Rather than having internal::_splice return a future and
+  // implementing internal::_splice as a chain of io::read and
+  // io::write calls, we use an explicit promise that we pass around
+  // so that we don't increase memory usage the longer that we splice.
+  memory::shared_ptr<Promise<Nothing> > promise(new Promise<Nothing>());
+
+  Future<Nothing> future = promise->future();
+
+  _splice(from, to, chunk, data, promise);
+
+  return future;
+}
+
+} // namespace internal {
+
+
+Future<string> read(int fd)
+{
+  process::initialize();
+
+  // Get our own copy of the file descriptor so that we're in control
+  // of the lifetime and don't crash if/when someone by accidently
+  // closes the file descriptor before discarding this future. We can
+  // also make sure it's non-blocking and will close-on-exec. Start by
+  // checking we've got a "valid" file descriptor before dup'ing.
+  if (fd < 0) {
+    return Failure(strerror(EBADF));
+  }
+
+  fd = dup(fd);
+  if (fd == -1) {
+    return Failure(ErrnoError("Failed to duplicate file descriptor"));
+  }
+
+  // Set the close-on-exec flag.
+  Try<Nothing> cloexec = os::cloexec(fd);
+  if (cloexec.isError()) {
+    os::close(fd);
+    return Failure(
+        "Failed to set close-on-exec on duplicated file descriptor: " +
+        cloexec.error());
+  }
+
+  // Make the file descriptor is non-blocking.
+  Try<Nothing> nonblock = os::nonblock(fd);
+  if (nonblock.isError()) {
+    os::close(fd);
+    return Failure(
+        "Failed to make duplicated file descriptor non-blocking: " +
+        nonblock.error());
+  }
+
+  // TODO(benh): Wrap up this data as a struct, use 'Owner'.
+  // TODO(bmahler): For efficiency, use a rope for the buffer.
+  memory::shared_ptr<string> buffer(new string());
+  boost::shared_array<char> data(new char[BUFFERED_READ_SIZE]);
+
+  return internal::_read(fd, buffer, data, BUFFERED_READ_SIZE)
+    .onAny(lambda::bind(&os::close, fd));
+}
+
+
+Future<Nothing> write(int fd, const std::string& data)
+{
+  process::initialize();
+
+  // Get our own copy of the file descriptor so that we're in control
+  // of the lifetime and don't crash if/when someone by accidently
+  // closes the file descriptor before discarding this future. We can
+  // also make sure it's non-blocking and will close-on-exec. Start by
+  // checking we've got a "valid" file descriptor before dup'ing.
+  if (fd < 0) {
+    return Failure(strerror(EBADF));
+  }
+
+  fd = dup(fd);
+  if (fd == -1) {
+    return Failure(ErrnoError("Failed to duplicate file descriptor"));
+  }
+
+  // Set the close-on-exec flag.
+  Try<Nothing> cloexec = os::cloexec(fd);
+  if (cloexec.isError()) {
+    os::close(fd);
+    return Failure(
+        "Failed to set close-on-exec on duplicated file descriptor: " +
+        cloexec.error());
+  }
+
+  // Make the file descriptor is non-blocking.
+  Try<Nothing> nonblock = os::nonblock(fd);
+  if (nonblock.isError()) {
+    os::close(fd);
+    return Failure(
+        "Failed to make duplicated file descriptor non-blocking: " +
+        nonblock.error());
+  }
+
+  return internal::_write(fd, Owned<string>(new string(data)), 0)
+    .onAny(lambda::bind(&os::close, fd));
+}
+
+
+Future<Nothing> redirect(int from, Option<int> to, size_t chunk)
+{
+  // Make sure we've got "valid" file descriptors.
+  if (from < 0 || (to.isSome() && to.get() < 0)) {
+    return Failure(strerror(EBADF));
+  }
+
+  if (to.isNone()) {
+    // Open up /dev/null that we can splice into.
+    Try<int> open = os::open("/dev/null", O_WRONLY);
+
+    if (open.isError()) {
+      return Failure("Failed to open /dev/null for writing: " + open.error());
+    }
+
+    to = open.get();
+  } else {
+    // Duplicate 'to' so that we're in control of its lifetime.
+    int fd = dup(to.get());
+    if (fd == -1) {
+      return Failure(ErrnoError("Failed to duplicate 'to' file descriptor"));
+    }
+
+    to = fd;
+  }
+
+  CHECK_SOME(to);
+
+  // Duplicate 'from' so that we're in control of its lifetime.
+  from = dup(from);
+  if (from == -1) {
+    return Failure(ErrnoError("Failed to duplicate 'from' file descriptor"));
+  }
+
+  // Set the close-on-exec flag (no-op if already set).
+  Try<Nothing> cloexec = os::cloexec(from);
+  if (cloexec.isError()) {
+    os::close(from);
+    os::close(to.get());
+    return Failure("Failed to set close-on-exec on 'from': " + cloexec.error());
+  }
+
+  cloexec = os::cloexec(to.get());
+  if (cloexec.isError()) {
+    os::close(from);
+    os::close(to.get());
+    return Failure("Failed to set close-on-exec on 'to': " + cloexec.error());
+  }
+
+  // Make the file descriptors non-blocking (no-op if already set).
+  Try<Nothing> nonblock = os::nonblock(from);
+  if (nonblock.isError()) {
+    os::close(from);
+    os::close(to.get());
+    return Failure("Failed to make 'from' non-blocking: " + nonblock.error());
+  }
+
+  nonblock = os::nonblock(to.get());
+  if (nonblock.isError()) {
+    os::close(from);
+    os::close(to.get());
+    return Failure("Failed to make 'to' non-blocking: " + nonblock.error());
+  }
+
+  return internal::splice(from, to.get(), chunk)
+    .onAny(lambda::bind(&os::close, from))
+    .onAny(lambda::bind(&os::close, to.get()));
+}
+
+} // namespace io {
+} // namespace process {

http://git-wip-us.apache.org/repos/asf/mesos/blob/37bba65e/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 1fc8874..aeaac0c 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -41,8 +41,6 @@
 #include <stdexcept>
 #include <vector>
 
-#include <boost/shared_array.hpp>
-
 #include <process/check.hpp>
 #include <process/clock.hpp>
 #include <process/defer.hpp>
@@ -3115,637 +3113,6 @@ void post(const UPID& from,
 }
 
 
-namespace io {
-namespace internal {
-
-void read(
-    int fd,
-    void* data,
-    size_t size,
-    const memory::shared_ptr<Promise<size_t> >& promise,
-    const Future<short>& future)
-{
-  // Ignore this function if the read operation has been discarded.
-  if (promise->future().hasDiscard()) {
-    CHECK(!future.isPending());
-    promise->discard();
-    return;
-  }
-
-  if (size == 0) {
-    promise->set(0);
-    return;
-  }
-
-  if (future.isDiscarded()) {
-    promise->fail("Failed to poll: discarded future");
-  } else if (future.isFailed()) {
-    promise->fail(future.failure());
-  } else {
-    ssize_t length = ::read(fd, data, size);
-    if (length < 0) {
-      if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
-        // Restart the read operation.
-        Future<short> future =
-          io::poll(fd, process::io::READ).onAny(
-              lambda::bind(&internal::read,
-                           fd,
-                           data,
-                           size,
-                           promise,
-                           lambda::_1));
-
-        // Stop polling if a discard occurs on our future.
-        promise->future().onDiscard(
-            lambda::bind(&process::internal::discard<short>,
-                         WeakFuture<short>(future)));
-      } else {
-        // Error occurred.
-        promise->fail(strerror(errno));
-      }
-    } else {
-      promise->set(length);
-    }
-  }
-}
-
-
-void write(
-    int fd,
-    void* data,
-    size_t size,
-    const memory::shared_ptr<Promise<size_t> >& promise,
-    const Future<short>& future)
-{
-  // Ignore this function if the write operation has been discarded.
-  if (promise->future().hasDiscard()) {
-    promise->discard();
-    return;
-  }
-
-  if (size == 0) {
-    promise->set(0);
-    return;
-  }
-
-  if (future.isDiscarded()) {
-    promise->fail("Failed to poll: discarded future");
-  } else if (future.isFailed()) {
-    promise->fail(future.failure());
-  } else {
-    // Do a write but ignore SIGPIPE so we can return an error when
-    // writing to a pipe or socket where the reading end is closed.
-    // TODO(benh): The 'suppress' macro failed to work on OS X as it
-    // appears that signal delivery was happening asynchronously.
-    // That is, the signal would not appear to be pending when the
-    // 'suppress' block was closed thus the destructor for
-    // 'Suppressor' was not waiting/removing the signal via 'sigwait'.
-    // It also appeared that the signal would be delivered to another
-    // thread even if it remained blocked in this thiread. The
-    // workaround here is to check explicitly for EPIPE and then do
-    // 'sigwait' regardless of what 'os::signals::pending' returns. We
-    // don't have that luxury with 'Suppressor' and arbitrary signals
-    // because we don't always have something like EPIPE to tell us
-    // that a signal is (or will soon be) pending.
-    bool pending = os::signals::pending(SIGPIPE);
-    bool unblock = !pending ? os::signals::block(SIGPIPE) : false;
-
-    ssize_t length = ::write(fd, data, size);
-
-    // Save the errno so we can restore it after doing sig* functions
-    // below.
-    int errno_ = errno;
-
-    if (length < 0 && errno == EPIPE && !pending) {
-      sigset_t mask;
-      sigemptyset(&mask);
-      sigaddset(&mask, SIGPIPE);
-
-      int result;
-      do {
-        int ignored;
-        result = sigwait(&mask, &ignored);
-      } while (result == -1 && errno == EINTR);
-    }
-
-    if (unblock) {
-      os::signals::unblock(SIGPIPE);
-    }
-
-    errno = errno_;
-
-    if (length < 0) {
-      if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
-        // Restart the write operation.
-        Future<short> future =
-          io::poll(fd, process::io::WRITE).onAny(
-              lambda::bind(&internal::write,
-                           fd,
-                           data,
-                           size,
-                           promise,
-                           lambda::_1));
-
-        // Stop polling if a discard occurs on our future.
-        promise->future().onDiscard(
-            lambda::bind(&process::internal::discard<short>,
-                         WeakFuture<short>(future)));
-      } else {
-        // Error occurred.
-        promise->fail(strerror(errno));
-      }
-    } else {
-      // TODO(benh): Retry if 'length' is 0?
-      promise->set(length);
-    }
-  }
-}
-
-} // namespace internal {
-
-
-Future<size_t> read(int fd, void* data, size_t size)
-{
-  process::initialize();
-
-  memory::shared_ptr<Promise<size_t> > promise(new Promise<size_t>());
-
-  // Check the file descriptor.
-  Try<bool> nonblock = os::isNonblock(fd);
-  if (nonblock.isError()) {
-    // The file descriptor is not valid (e.g., has been closed).
-    promise->fail(
-        "Failed to check if file descriptor was non-blocking: " +
-        nonblock.error());
-    return promise->future();
-  } else if (!nonblock.get()) {
-    // The file descriptor is not non-blocking.
-    promise->fail("Expected a non-blocking file descriptor");
-    return promise->future();
-  }
-
-  // Because the file descriptor is non-blocking, we call read()
-  // immediately. The read may in turn call poll if necessary,
-  // avoiding unnecessary polling. We also observed that for some
-  // combination of libev and Linux kernel versions, the poll would
-  // block for non-deterministically long periods of time. This may be
-  // fixed in a newer version of libev (we use 3.8 at the time of
-  // writing this comment).
-  internal::read(fd, data, size, promise, io::READ);
-
-  return promise->future();
-}
-
-
-Future<size_t> write(int fd, void* data, size_t size)
-{
-  process::initialize();
-
-  memory::shared_ptr<Promise<size_t> > promise(new Promise<size_t>());
-
-  // Check the file descriptor.
-  Try<bool> nonblock = os::isNonblock(fd);
-  if (nonblock.isError()) {
-    // The file descriptor is not valid (e.g., has been closed).
-    promise->fail(
-        "Failed to check if file descriptor was non-blocking: " +
-        nonblock.error());
-    return promise->future();
-  } else if (!nonblock.get()) {
-    // The file descriptor is not non-blocking.
-    promise->fail("Expected a non-blocking file descriptor");
-    return promise->future();
-  }
-
-  // Because the file descriptor is non-blocking, we call write()
-  // immediately. The write may in turn call poll if necessary,
-  // avoiding unnecessary polling. We also observed that for some
-  // combination of libev and Linux kernel versions, the poll would
-  // block for non-deterministically long periods of time. This may be
-  // fixed in a newer version of libev (we use 3.8 at the time of
-  // writing this comment).
-  internal::write(fd, data, size, promise, io::WRITE);
-
-  return promise->future();
-}
-
-
-namespace internal {
-
-#if __cplusplus >= 201103L
-Future<string> _read(
-    int fd,
-    const memory::shared_ptr<string>& buffer,
-    const boost::shared_array<char>& data,
-    size_t length)
-{
-  return io::read(fd, data.get(), length)
-    .then([=] (size_t size) -> Future<string> {
-      if (size == 0) { // EOF.
-        return string(*buffer);
-      }
-      buffer->append(data.get(), size);
-      return _read(fd, buffer, data, length);
-    });
-}
-#else
-// Forward declataion.
-Future<string> _read(
-    int fd,
-    const memory::shared_ptr<string>& buffer,
-    const boost::shared_array<char>& data,
-    size_t length);
-
-
-Future<string> __read(
-    size_t size,
-    int fd,
-    const memory::shared_ptr<string>& buffer,
-    const boost::shared_array<char>& data,
-    size_t length)
-{
-  if (size == 0) { // EOF.
-    return string(*buffer);
-  }
-
-  buffer->append(data.get(), size);
-
-  return _read(fd, buffer, data, length);
-}
-
-
-Future<string> _read(
-    int fd,
-    const memory::shared_ptr<string>& buffer,
-    const boost::shared_array<char>& data,
-    size_t length)
-{
-  return io::read(fd, data.get(), length)
-    .then(lambda::bind(&__read, lambda::_1, fd, buffer, data, length));
-}
-#endif // __cplusplus >= 201103L
-
-
-#if __cplusplus >= 201103L
-Future<Nothing> _write(
-    int fd,
-    Owned<string> data,
-    size_t index)
-{
-  return io::write(fd, (void*) (data->data() + index), data->size() - index)
-    .then([=] (size_t length) -> Future<Nothing> {
-      if (index + length == data->size()) {
-        return Nothing();
-      }
-      return _write(fd, data, index + length);
-    });
-}
-#else
-// Forward declaration.
-Future<Nothing> _write(
-    int fd,
-    Owned<string> data,
-    size_t index);
-
-
-Future<Nothing> __write(
-    int fd,
-    Owned<string> data,
-    size_t index,
-    size_t length)
-{
-  if (index + length == data->size()) {
-    return Nothing();
-  }
-  return _write(fd, data, index + length);
-}
-
-
-Future<Nothing> _write(
-    int fd,
-    Owned<string> data,
-    size_t index)
-{
-  return io::write(fd, (void*) (data->data() + index), data->size() - index)
-    .then(lambda::bind(&__write, fd, data, index, lambda::_1));
-}
-#endif // __cplusplus >= 201103L
-
-
-#if __cplusplus >= 201103L
-void _splice(
-    int from,
-    int to,
-    size_t chunk,
-    boost::shared_array<char> data,
-    memory::shared_ptr<Promise<Nothing>> promise)
-{
-  // Stop splicing if a discard occured on our future.
-  if (promise->future().hasDiscard()) {
-    // TODO(benh): Consider returning the number of bytes already
-    // spliced on discarded, or a failure. Same for the 'onDiscarded'
-    // callbacks below.
-    promise->discard();
-    return;
-  }
-
-  // Note that only one of io::read or io::write is outstanding at any
-  // one point in time thus the reuse of 'data' for both operations.
-
-  Future<size_t> read = io::read(from, data.get(), chunk);
-
-  // Stop reading (or potentially indefinitely polling) if a discard
-  // occcurs on our future.
-  promise->future().onDiscard(
-      lambda::bind(&process::internal::discard<size_t>,
-                   WeakFuture<size_t>(read)));
-
-  read
-    .onReady([=] (size_t size) {
-      if (size == 0) { // EOF.
-        promise->set(Nothing());
-      } else {
-        // Note that we always try and complete the write, even if a
-        // discard has occured on our future, in order to provide
-        // semantics where everything read is written. The promise
-        // will eventually be discarded in the next read.
-        io::write(to, string(data.get(), size))
-          .onReady([=] () { _splice(from, to, chunk, data, promise); })
-          .onFailed([=] (const string& message) { promise->fail(message); })
-          .onDiscarded([=] () { promise->discard(); });
-      }
-    })
-    .onFailed([=] (const string& message) { promise->fail(message); })
-    .onDiscarded([=] () { promise->discard(); });
-}
-#else
-// Forward declarations.
-void __splice(
-    int from,
-    int to,
-    size_t chunk,
-    boost::shared_array<char> data,
-    memory::shared_ptr<Promise<Nothing> > promise,
-    size_t size);
-
-void ___splice(
-    memory::shared_ptr<Promise<Nothing> > promise,
-    const string& message);
-
-void ____splice(
-    memory::shared_ptr<Promise<Nothing> > promise);
-
-
-void _splice(
-    int from,
-    int to,
-    size_t chunk,
-    boost::shared_array<char> data,
-    memory::shared_ptr<Promise<Nothing> > promise)
-{
-  // Stop splicing if a discard occured on our future.
-  if (promise->future().hasDiscard()) {
-    // TODO(benh): Consider returning the number of bytes already
-    // spliced on discarded, or a failure. Same for the 'onDiscarded'
-    // callbacks below.
-    promise->discard();
-    return;
-  }
-
-  Future<size_t> read = io::read(from, data.get(), chunk);
-
-  // Stop reading (or potentially indefinitely polling) if a discard
-  // occurs on our future.
-  promise->future().onDiscard(
-      lambda::bind(&process::internal::discard<size_t>,
-                   WeakFuture<size_t>(read)));
-
-  read
-    .onReady(
-        lambda::bind(&__splice, from, to, chunk, data, promise, lambda::_1))
-    .onFailed(lambda::bind(&___splice, promise, lambda::_1))
-    .onDiscarded(lambda::bind(&____splice, promise));
-}
-
-
-void __splice(
-    int from,
-    int to,
-    size_t chunk,
-    boost::shared_array<char> data,
-    memory::shared_ptr<Promise<Nothing> > promise,
-    size_t size)
-{
-  if (size == 0) { // EOF.
-    promise->set(Nothing());
-  } else {
-    // Note that we always try and complete the write, even if a
-    // discard has occured on our future, in order to provide
-    // semantics where everything read is written. The promise will
-    // eventually be discarded in the next read.
-    io::write(to, string(data.get(), size))
-      .onReady(lambda::bind(&_splice, from, to, chunk, data, promise))
-      .onFailed(lambda::bind(&___splice, promise, lambda::_1))
-      .onDiscarded(lambda::bind(&____splice, promise));
-  }
-}
-
-
-void ___splice(
-    memory::shared_ptr<Promise<Nothing> > promise,
-    const string& message)
-{
-  promise->fail(message);
-}
-
-
-void ____splice(
-    memory::shared_ptr<Promise<Nothing> > promise)
-{
-  promise->discard();
-}
-#endif // __cplusplus >= 201103L
-
-
-Future<Nothing> splice(int from, int to, size_t chunk)
-{
-  boost::shared_array<char> data(new char[chunk]);
-
-  // Rather than having internal::_splice return a future and
-  // implementing internal::_splice as a chain of io::read and
-  // io::write calls, we use an explicit promise that we pass around
-  // so that we don't increase memory usage the longer that we splice.
-  memory::shared_ptr<Promise<Nothing> > promise(new Promise<Nothing>());
-
-  Future<Nothing> future = promise->future();
-
-  _splice(from, to, chunk, data, promise);
-
-  return future;
-}
-
-} // namespace internal {
-
-
-Future<string> read(int fd)
-{
-  process::initialize();
-
-  // Get our own copy of the file descriptor so that we're in control
-  // of the lifetime and don't crash if/when someone by accidently
-  // closes the file descriptor before discarding this future. We can
-  // also make sure it's non-blocking and will close-on-exec. Start by
-  // checking we've got a "valid" file descriptor before dup'ing.
-  if (fd < 0) {
-    return Failure(strerror(EBADF));
-  }
-
-  fd = dup(fd);
-  if (fd == -1) {
-    return Failure(ErrnoError("Failed to duplicate file descriptor"));
-  }
-
-  // Set the close-on-exec flag.
-  Try<Nothing> cloexec = os::cloexec(fd);
-  if (cloexec.isError()) {
-    os::close(fd);
-    return Failure(
-        "Failed to set close-on-exec on duplicated file descriptor: " +
-        cloexec.error());
-  }
-
-  // Make the file descriptor is non-blocking.
-  Try<Nothing> nonblock = os::nonblock(fd);
-  if (nonblock.isError()) {
-    os::close(fd);
-    return Failure(
-        "Failed to make duplicated file descriptor non-blocking: " +
-        nonblock.error());
-  }
-
-  // TODO(benh): Wrap up this data as a struct, use 'Owner'.
-  // TODO(bmahler): For efficiency, use a rope for the buffer.
-  memory::shared_ptr<string> buffer(new string());
-  boost::shared_array<char> data(new char[BUFFERED_READ_SIZE]);
-
-  return internal::_read(fd, buffer, data, BUFFERED_READ_SIZE)
-    .onAny(lambda::bind(&os::close, fd));
-}
-
-
-Future<Nothing> write(int fd, const std::string& data)
-{
-  process::initialize();
-
-  // Get our own copy of the file descriptor so that we're in control
-  // of the lifetime and don't crash if/when someone by accidently
-  // closes the file descriptor before discarding this future. We can
-  // also make sure it's non-blocking and will close-on-exec. Start by
-  // checking we've got a "valid" file descriptor before dup'ing.
-  if (fd < 0) {
-    return Failure(strerror(EBADF));
-  }
-
-  fd = dup(fd);
-  if (fd == -1) {
-    return Failure(ErrnoError("Failed to duplicate file descriptor"));
-  }
-
-  // Set the close-on-exec flag.
-  Try<Nothing> cloexec = os::cloexec(fd);
-  if (cloexec.isError()) {
-    os::close(fd);
-    return Failure(
-        "Failed to set close-on-exec on duplicated file descriptor: " +
-        cloexec.error());
-  }
-
-  // Make the file descriptor is non-blocking.
-  Try<Nothing> nonblock = os::nonblock(fd);
-  if (nonblock.isError()) {
-    os::close(fd);
-    return Failure(
-        "Failed to make duplicated file descriptor non-blocking: " +
-        nonblock.error());
-  }
-
-  return internal::_write(fd, Owned<string>(new string(data)), 0)
-    .onAny(lambda::bind(&os::close, fd));
-}
-
-
-Future<Nothing> redirect(int from, Option<int> to, size_t chunk)
-{
-  // Make sure we've got "valid" file descriptors.
-  if (from < 0 || (to.isSome() && to.get() < 0)) {
-    return Failure(strerror(EBADF));
-  }
-
-  if (to.isNone()) {
-    // Open up /dev/null that we can splice into.
-    Try<int> open = os::open("/dev/null", O_WRONLY);
-
-    if (open.isError()) {
-      return Failure("Failed to open /dev/null for writing: " + open.error());
-    }
-
-    to = open.get();
-  } else {
-    // Duplicate 'to' so that we're in control of its lifetime.
-    int fd = dup(to.get());
-    if (fd == -1) {
-      return Failure(ErrnoError("Failed to duplicate 'to' file descriptor"));
-    }
-
-    to = fd;
-  }
-
-  CHECK_SOME(to);
-
-  // Duplicate 'from' so that we're in control of its lifetime.
-  from = dup(from);
-  if (from == -1) {
-    return Failure(ErrnoError("Failed to duplicate 'from' file descriptor"));
-  }
-
-  // Set the close-on-exec flag (no-op if already set).
-  Try<Nothing> cloexec = os::cloexec(from);
-  if (cloexec.isError()) {
-    os::close(from);
-    os::close(to.get());
-    return Failure("Failed to set close-on-exec on 'from': " + cloexec.error());
-  }
-
-  cloexec = os::cloexec(to.get());
-  if (cloexec.isError()) {
-    os::close(from);
-    os::close(to.get());
-    return Failure("Failed to set close-on-exec on 'to': " + cloexec.error());
-  }
-
-  // Make the file descriptors non-blocking (no-op if already set).
-  Try<Nothing> nonblock = os::nonblock(from);
-  if (nonblock.isError()) {
-    os::close(from);
-    os::close(to.get());
-    return Failure("Failed to make 'from' non-blocking: " + nonblock.error());
-  }
-
-  nonblock = os::nonblock(to.get());
-  if (nonblock.isError()) {
-    os::close(from);
-    os::close(to.get());
-    return Failure("Failed to make 'to' non-blocking: " + nonblock.error());
-  }
-
-  return internal::splice(from, to.get(), chunk)
-    .onAny(lambda::bind(&os::close, from))
-    .onAny(lambda::bind(&os::close, to.get()));
-}
-
-} // namespace io {
-
-
 namespace inject {
 
 bool exited(const UPID& from, const UPID& to)


[26/30] mesos git commit: Add bind(), listen(), and accept() to Socket interface.

Posted by be...@apache.org.
Add bind(), listen(), and accept() to Socket interface.

Review: https://reviews.apache.org/r/27966


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8279b45e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8279b45e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8279b45e

Branch: refs/heads/master
Commit: 8279b45ed325375bbf8cc5919fd7009c7d2335cf
Parents: 8edab65
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Nov 15 16:55:07 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 17:38:22 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/socket.hpp |  21 ++
 3rdparty/libprocess/src/libev.cpp              |   1 -
 3rdparty/libprocess/src/libev.hpp              |   3 -
 3rdparty/libprocess/src/process.cpp            | 235 +++++++++++---------
 4 files changed, 153 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/8279b45e/3rdparty/libprocess/include/process/socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/socket.hpp b/3rdparty/libprocess/include/process/socket.hpp
index 5fd8d1b..c022924 100644
--- a/3rdparty/libprocess/include/process/socket.hpp
+++ b/3rdparty/libprocess/include/process/socket.hpp
@@ -82,6 +82,12 @@ public:
 
     Future<size_t> sendfile(int fd, off_t offset, size_t size);
 
+    Try<Node> bind(const Node& node);
+
+    Try<Nothing> listen(int backlog);
+
+    Future<Socket> accept();
+
   private:
     const Impl& create() const
     {
@@ -143,6 +149,21 @@ public:
     return impl->sendfile(fd, offset, size);
   }
 
+  Try<Node> bind(const Node& node)
+  {
+    return impl->bind(node);
+  }
+
+  Try<Nothing> listen(int backlog)
+  {
+    return impl->listen(backlog);
+  }
+
+  Future<Socket> accept()
+  {
+    return impl->accept();
+  }
+
 private:
   explicit Socket(std::shared_ptr<Impl>&& that) : impl(std::move(that)) {}
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/8279b45e/3rdparty/libprocess/src/libev.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libev.cpp b/3rdparty/libprocess/src/libev.cpp
index efc89d8..6560050 100644
--- a/3rdparty/libprocess/src/libev.cpp
+++ b/3rdparty/libprocess/src/libev.cpp
@@ -12,7 +12,6 @@ namespace process {
 // libev.hpp (since these need to live in the static data space).
 struct ev_loop* loop = NULL;
 ev_async async_watcher;
-ev_io server_watcher;
 std::queue<ev_io*>* watchers = new std::queue<ev_io*>();
 synchronizable(watchers);
 std::queue<lambda::function<void(void)>>* functions =

http://git-wip-us.apache.org/repos/asf/mesos/blob/8279b45e/3rdparty/libprocess/src/libev.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libev.hpp b/3rdparty/libprocess/src/libev.hpp
index bac8b6a..04847e3 100644
--- a/3rdparty/libprocess/src/libev.hpp
+++ b/3rdparty/libprocess/src/libev.hpp
@@ -18,9 +18,6 @@ extern struct ev_loop* loop;
 // with IO watchers and functions (via run_in_event_loop).
 extern ev_async async_watcher;
 
-// Server watcher for accepting connections.
-extern ev_io server_watcher;
-
 // Queue of I/O watchers to be asynchronously added to the event loop
 // (protected by 'watchers' below).
 // TODO(benh): Replace this queue with functions that we put in

http://git-wip-us.apache.org/repos/asf/mesos/blob/8279b45e/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 9f91020..5d3b947 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -267,7 +267,7 @@ public:
   SocketManager();
   ~SocketManager();
 
-  Socket accepted(int s);
+  void accepted(const Socket& socket);
 
   void link(ProcessBase* process, const UPID& to);
 
@@ -433,8 +433,11 @@ const string Profiler::STOP_HELP = HELP(
 // Unique id that can be assigned to each process.
 static uint32_t __id__ = 0;
 
+// Server socket listen backlog.
+static const int LISTEN_BACKLOG = 500000;
+
 // Local server socket.
-static int __s__ = -1;
+static Socket __s__;
 
 // Local node.
 static Node __node__;
@@ -640,66 +643,6 @@ void decode_read(
 } // namespace internal {
 
 
-void accept(struct ev_loop* loop, ev_io* watcher, int revents)
-{
-  CHECK_EQ(__s__, watcher->fd);
-
-  sockaddr_in addr;
-  socklen_t addrlen = sizeof(addr);
-
-  int s = ::accept(__s__, (sockaddr*) &addr, &addrlen);
-
-  if (s < 0) {
-    return;
-  }
-
-  Try<Nothing> nonblock = os::nonblock(s);
-  if (nonblock.isError()) {
-    LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, nonblock: "
-                                << nonblock.error();
-    os::close(s);
-    return;
-  }
-
-  Try<Nothing> cloexec = os::cloexec(s);
-  if (cloexec.isError()) {
-    LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, cloexec: "
-                                << cloexec.error();
-    os::close(s);
-    return;
-  }
-
-  // Turn off Nagle (TCP_NODELAY) so pipelined requests don't wait.
-  int on = 1;
-  if (setsockopt(s, SOL_TCP, TCP_NODELAY, &on, sizeof(on)) < 0) {
-    const char* error = strerror(errno);
-    VLOG(1) << "Failed to turn off the Nagle algorithm: " << error;
-    os::close(s);
-  } else {
-    // Inform the socket manager for proper bookkeeping.
-    Socket socket = socket_manager->accepted(s);
-
-    // Allocate a buffer to read into. This can be replaced later
-    // when socket supports a read function that provides the
-    // buffered data in the resulting callback.
-    const size_t size = 80 * 1024;
-    char* data = new char[size];
-    memset(data, 0, size);
-
-    DataDecoder* decoder = new DataDecoder(socket);
-
-    socket.read(data, size)
-      .onAny(lambda::bind(
-          &internal::decode_read,
-          lambda::_1,
-          data,
-          size,
-          new Socket(socket),
-          decoder));
-  }
-}
-
-
 void* serve(void* arg)
 {
   ev_loop(((struct ev_loop*) arg), 0);
@@ -765,6 +708,37 @@ void timedout(list<Timer>&& timers)
 // }
 
 
+namespace internal {
+
+void on_accept(const Future<Socket>& socket)
+{
+  if (socket.isReady()) {
+    // Inform the socket manager for proper bookkeeping.
+    socket_manager->accepted(socket.get());
+
+    const size_t size = 80 * 1024;
+    char* data = new char[size];
+    memset(data, 0, size);
+
+    DataDecoder* decoder = new DataDecoder(socket.get());
+
+    socket.get().read(data, size)
+      .onAny(lambda::bind(
+          &internal::decode_read,
+          lambda::_1,
+          data,
+          size,
+          new Socket(socket.get()),
+          decoder));
+  }
+
+  __s__.accept()
+    .onAny(lambda::bind(&on_accept, lambda::_1));
+}
+
+} // namespace internal {
+
+
 void initialize(const string& delegate)
 {
   // TODO(benh): Return an error if attempting to initialize again
@@ -866,21 +840,6 @@ void initialize(const string& delegate)
   }
 
   // Create a "server" socket for communicating with other nodes.
-  if ((__s__ = ::socket(AF_INET, SOCK_STREAM, 0)) < 0) {
-    PLOG(FATAL) << "Failed to initialize, socket";
-  }
-
-  // Make socket non-blocking.
-  Try<Nothing> nonblock = os::nonblock(__s__);
-  if (nonblock.isError()) {
-    LOG(FATAL) << "Failed to initialize, nonblock: " << nonblock.error();
-  }
-
-  // Set FD_CLOEXEC flag.
-  Try<Nothing> cloexec = os::cloexec(__s__);
-  if (cloexec.isError()) {
-    LOG(FATAL) << "Failed to initialize, cloexec: " << cloexec.error();
-  }
 
   // Allow address reuse.
   int on = 1;
@@ -888,25 +847,12 @@ void initialize(const string& delegate)
     PLOG(FATAL) << "Failed to initialize, setsockopt(SO_REUSEADDR)";
   }
 
-  // Set up socket.
-  sockaddr_in addr;
-  memset(&addr, 0, sizeof(addr));
-  addr.sin_family = PF_INET;
-  addr.sin_addr.s_addr = __node__.ip;
-  addr.sin_port = htons(__node__.port);
-
-  if (bind(__s__, (sockaddr*) &addr, sizeof(addr)) < 0) {
-    PLOG(FATAL) << "Failed to initialize, bind " << __node__;
-  }
-
-  // Lookup and store assigned ip and assigned port.
-  socklen_t addrlen = sizeof(addr);
-  if (getsockname(__s__, (sockaddr*) &addr, &addrlen) < 0) {
-    PLOG(FATAL) << "Failed to initialize, getsockname";
+  Try<Node> bind = __s__.bind(__node__);
+  if (bind.isError()) {
+    PLOG(FATAL) << "Failed to initialize: " << bind.error();
   }
 
-  __node__.ip = addr.sin_addr.s_addr;
-  __node__.port = ntohs(addr.sin_port);
+  __node__ = bind.get();
 
   // Lookup hostname if missing ip or if ip is 127.0.0.1 in case we
   // actually have a valid external ip address. Note that we need only
@@ -931,8 +877,9 @@ void initialize(const string& delegate)
     __node__.ip = *((uint32_t *) he->h_addr_list[0]);
   }
 
-  if (listen(__s__, 500000) < 0) {
-    PLOG(FATAL) << "Failed to initialize, listen";
+  Try<Nothing> listen = __s__.listen(LISTEN_BACKLOG);
+  if (listen.isError()) {
+    PLOG(FATAL) << "Failed to initialize: " << listen.error();
   }
 
   // Initialize libev.
@@ -950,9 +897,6 @@ void initialize(const string& delegate)
   ev_async_init(&async_watcher, handle_async);
   ev_async_start(loop, &async_watcher);
 
-  ev_io_init(&server_watcher, accept, __s__, EV_READ);
-  ev_io_start(loop, &server_watcher);
-
   Clock::initialize(lambda::bind(&timedout, lambda::_1));
 
 //   ev_child_init(&child_watcher, child_exited, pid, 0);
@@ -979,6 +923,9 @@ void initialize(const string& delegate)
   // 'spawn' below for the garbage collector.
   initializing = false;
 
+  __s__.accept()
+    .onAny(lambda::bind(&internal::on_accept, lambda::_1));
+
   // TODO(benh): Make sure creating the garbage collector, logging
   // process, and profiler always succeeds and use supervisors to make
   // sure that none terminate.
@@ -1436,6 +1383,90 @@ Future<size_t> Socket::Impl::sendfile(int fd, off_t offset, size_t size)
 }
 
 
+Try<Node> Socket::Impl::bind(const Node& node)
+{
+  sockaddr_in addr;
+  memset(&addr, 0, sizeof(addr));
+  addr.sin_family = PF_INET;
+  addr.sin_addr.s_addr = node.ip;
+  addr.sin_port = htons(node.port);
+
+  if (::bind(get(), (sockaddr*) &addr, sizeof(addr)) < 0) {
+    return Error("Failed to bind: " + string(inet_ntoa(addr.sin_addr)) +
+                 ":" + stringify(node.port));
+  }
+
+  // Lookup and store assigned ip and assigned port.
+  socklen_t addrlen = sizeof(addr);
+  if (getsockname(get(), (sockaddr*) &addr, &addrlen) < 0) {
+    return ErrnoError("Failed to bind, getsockname");
+  }
+
+  return Node(addr.sin_addr.s_addr, ntohs(addr.sin_port));
+}
+
+
+Try<Nothing> Socket::Impl::listen(int backlog)
+{
+  if (::listen(get(), backlog) < 0) {
+    return ErrnoError();
+  }
+  return Nothing();
+}
+
+
+namespace internal {
+
+Future<Socket> accept(int fd)
+{
+  sockaddr_in addr;
+  socklen_t addrlen = sizeof(addr);
+
+  int s = ::accept(fd, (sockaddr*) &addr, &addrlen);
+
+  if (s < 0) {
+    return Failure(ErrnoError("Failed to accept"));
+  }
+
+  Try<Nothing> nonblock = os::nonblock(s);
+  if (nonblock.isError()) {
+    LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, nonblock: "
+                                << nonblock.error();
+    os::close(s);
+    return Failure("Failed to accept, nonblock: " + nonblock.error());
+  }
+
+  Try<Nothing> cloexec = os::cloexec(s);
+  if (cloexec.isError()) {
+    LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, cloexec: "
+                                << cloexec.error();
+    os::close(s);
+    return Failure("Failed to accept, cloexec: " + cloexec.error());
+  }
+
+  // Turn off Nagle (TCP_NODELAY) so pipelined requests don't wait.
+  int on = 1;
+  if (setsockopt(s, SOL_TCP, TCP_NODELAY, &on, sizeof(on)) < 0) {
+    const char* error = strerror(errno);
+    VLOG(1) << "Failed to turn off the Nagle algorithm: " << error;
+    os::close(s);
+    return Failure(
+      "Failed to turn off the Nagle algorithm: " + stringify(error));
+  }
+
+  return Socket(s);
+}
+
+} // namespace internal {
+
+
+Future<Socket> Socket::Impl::accept()
+{
+  return io::poll(get(), io::READ)
+    .then(lambda::bind(&internal::accept, get()));
+}
+
+
 SocketManager::SocketManager()
 {
   synchronizer(this) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
@@ -1445,13 +1476,11 @@ SocketManager::SocketManager()
 SocketManager::~SocketManager() {}
 
 
-Socket SocketManager::accepted(int s)
+void SocketManager::accepted(const Socket& socket)
 {
   synchronized (this) {
-    return sockets[s] = Socket(s);
+    sockets[socket] = socket;
   }
-
-  UNREACHABLE();
 }
 
 


[20/30] mesos git commit: Introduce std::make_shared configure check in libprocess.

Posted by be...@apache.org.
Introduce std::make_shared configure check in libprocess.

Review: https://reviews.apache.org/r/27920


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b7f7c98b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b7f7c98b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b7f7c98b

Branch: refs/heads/master
Commit: b7f7c98bd04f2a1c62cc3c034572d779ccc0e8fe
Parents: fcdae27
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Nov 15 17:34:50 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 17:38:21 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/m4/ax_cxx_compile_stdcxx_11.m4 | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/b7f7c98b/3rdparty/libprocess/m4/ax_cxx_compile_stdcxx_11.m4
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/m4/ax_cxx_compile_stdcxx_11.m4 b/3rdparty/libprocess/m4/ax_cxx_compile_stdcxx_11.m4
index eebb4bd..d48a96e 100644
--- a/3rdparty/libprocess/m4/ax_cxx_compile_stdcxx_11.m4
+++ b/3rdparty/libprocess/m4/ax_cxx_compile_stdcxx_11.m4
@@ -64,6 +64,8 @@ m4_define([_AX_CXX_COMPILE_STDCXX_11_testbody], [
   std::unique_ptr<int> i(new int());
   int j = foo(std::move(i));
 
+  std::shared_ptr<int> k = std::make_shared<int>(2);
+
   void mutexTest()
   {
     std::mutex mutex;


[05/30] mesos git commit: Introduced a libev async watcher specifically for updating timers.

Posted by be...@apache.org.
Introduced a libev async watcher specifically for updating timers.

Review: https://reviews.apache.org/r/27502


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/18cc45f1
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/18cc45f1
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/18cc45f1

Branch: refs/heads/master
Commit: 18cc45f1f0c27e1766be9c1227c24d63c14bbcb0
Parents: e1ef91f
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sun Nov 2 16:49:22 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 16:25:58 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/src/process.cpp | 22 +++++++++++++++++-----
 1 file changed, 17 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/18cc45f1/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 2282e9b..a0b4ca0 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -449,9 +449,14 @@ static ProcessManager* process_manager = NULL;
 // Event loop.
 static struct ev_loop* loop = NULL;
 
-// Asynchronous watcher for interrupting loop.
+// Asynchronous watcher for interrupting loop to specifically deal
+// with IO watchers and functions (via run_in_event_loop).
 static ev_async async_watcher;
 
+// Asynchronous watcher for interrupting loop to specifically deal
+// with updating timers.
+static ev_async async_update_timer_watcher;
+
 // Watcher for timeouts.
 static ev_timer timeouts_watcher;
 
@@ -609,7 +614,7 @@ void Clock::resume()
       clock::settling = false;
       clock::currents->clear();
       update_timer = true;
-      ev_async_send(loop, &async_watcher);
+      ev_async_send(loop, &async_update_timer_watcher);
     }
   }
 }
@@ -624,7 +629,7 @@ void Clock::advance(const Duration& duration)
       VLOG(2) << "Clock advanced ("  << duration << ") to " << clock::current;
       if (!update_timer) {
         update_timer = true;
-        ev_async_send(loop, &async_watcher);
+        ev_async_send(loop, &async_update_timer_watcher);
       }
     }
   }
@@ -655,7 +660,7 @@ void Clock::update(const Time& time)
         VLOG(2) << "Clock updated to " << clock::current;
         if (!update_timer) {
           update_timer = true;
-          ev_async_send(loop, &async_watcher);
+          ev_async_send(loop, &async_update_timer_watcher);
         }
       }
     }
@@ -866,7 +871,11 @@ void handle_async(struct ev_loop* loop, ev_async* _, int revents)
       functions->pop();
     }
   }
+}
 
+
+void handle_async_update_timer(struct ev_loop* loop, ev_async* _, int revents)
+{
   synchronized (timeouts) {
     if (update_timer) {
       if (!timeouts->empty()) {
@@ -1619,6 +1628,9 @@ void initialize(const string& delegate)
   ev_async_init(&async_watcher, handle_async);
   ev_async_start(loop, &async_watcher);
 
+  ev_async_init(&async_update_timer_watcher, handle_async_update_timer);
+  ev_async_start(loop, &async_update_timer_watcher);
+
   ev_timer_init(&timeouts_watcher, handle_timeouts, 0., 2100000.0);
   ev_timer_again(loop, &timeouts_watcher);
 
@@ -3226,7 +3238,7 @@ Timer Clock::timer(
       // Need to interrupt the loop to update/set timer repeat.
       (*timeouts)[timer.timeout().time()].push_back(timer);
       update_timer = true;
-      ev_async_send(loop, &async_watcher);
+      ev_async_send(loop, &async_update_timer_watcher);
     } else {
       // Timer repeat is adequate, just add the timeout.
       CHECK(timeouts->size() >= 1);


[03/30] mesos git commit: Replaced Timer::create/cancel with Clock::timer/cancel.

Posted by be...@apache.org.
Replaced Timer::create/cancel with Clock::timer/cancel.

Review: https://reviews.apache.org/r/27496


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/303ee1c3
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/303ee1c3
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/303ee1c3

Branch: refs/heads/master
Commit: 303ee1c3f23c0bb2cc6cfd58982848ea225ad2df
Parents: acd656c
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sat Nov 1 16:00:53 2014 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 16:25:57 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/c++11/delay.hpp |  5 +++--
 3rdparty/libprocess/include/process/clock.hpp       | 16 +++++++++++++++-
 3rdparty/libprocess/include/process/delay.hpp       |  5 +++--
 3rdparty/libprocess/include/process/future.hpp      |  5 +++--
 3rdparty/libprocess/include/process/timer.hpp       | 11 ++++-------
 3rdparty/libprocess/src/process.cpp                 |  4 ++--
 6 files changed, 30 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/303ee1c3/3rdparty/libprocess/include/process/c++11/delay.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/c++11/delay.hpp b/3rdparty/libprocess/include/process/c++11/delay.hpp
index 5f686db..5818e83 100644
--- a/3rdparty/libprocess/include/process/c++11/delay.hpp
+++ b/3rdparty/libprocess/include/process/c++11/delay.hpp
@@ -1,6 +1,7 @@
 #ifndef __PROCESS_DELAY_HPP__
 #define __PROCESS_DELAY_HPP__
 
+#include <process/clock.hpp>
 #include <process/dispatch.hpp>
 #include <process/timer.hpp>
 
@@ -19,7 +20,7 @@ Timer delay(const Duration& duration,
             const PID<T>& pid,
             void (T::*method)())
 {
-  return Timer::create(duration, [=] () {
+  return Clock::timer(duration, [=] () {
     dispatch(pid, method);
   });
 }
@@ -52,7 +53,7 @@ Timer delay(const Duration& duration,
               void (T::*method)(ENUM_PARAMS(N, P)),                     \
               ENUM_BINARY_PARAMS(N, A, a))                              \
   {                                                                     \
-    return Timer::create(duration, [=] () {                             \
+    return Clock::timer(duration, [=] () {                              \
       dispatch(pid, method, ENUM_PARAMS(N, a));                         \
     });                                                                 \
   }                                                                     \

http://git-wip-us.apache.org/repos/asf/mesos/blob/303ee1c3/3rdparty/libprocess/include/process/clock.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/clock.hpp b/3rdparty/libprocess/include/process/clock.hpp
index eb157ca..80190ef 100644
--- a/3rdparty/libprocess/include/process/clock.hpp
+++ b/3rdparty/libprocess/include/process/clock.hpp
@@ -2,27 +2,41 @@
 #define __PROCESS_CLOCK_HPP__
 
 #include <process/time.hpp>
+#include <process/timer.hpp>
 
 #include <stout/duration.hpp>
+#include <stout/lambda.hpp>
 
 namespace process {
 
-// Forward declarations.
+// Forward declarations (to avoid circular dependencies).
 class ProcessBase;
 class Time;
+class Timer;
 
 class Clock
 {
 public:
   static Time now();
   static Time now(ProcessBase* process);
+
+  static Timer timer(
+      const Duration& duration,
+      const lambda::function<void(void)>& thunk);
+
+  static bool cancel(const Timer& timer);
+
   static void pause();
   static bool paused();
+
   static void resume();
+
   static void advance(const Duration& duration);
   static void advance(ProcessBase* process, const Duration& duration);
+
   static void update(const Time& time);
   static void update(ProcessBase* process, const Time& time);
+
   static void order(ProcessBase* from, ProcessBase* to);
 
   // When the clock is paused, settle() synchronously ensures that:

http://git-wip-us.apache.org/repos/asf/mesos/blob/303ee1c3/3rdparty/libprocess/include/process/delay.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/delay.hpp b/3rdparty/libprocess/include/process/delay.hpp
index 487f652..40627ea 100644
--- a/3rdparty/libprocess/include/process/delay.hpp
+++ b/3rdparty/libprocess/include/process/delay.hpp
@@ -4,6 +4,7 @@
 #ifndef __PROCESS_DELAY_HPP__
 #define __PROCESS_DELAY_HPP__
 
+#include <process/clock.hpp>
 #include <process/dispatch.hpp>
 #include <process/timer.hpp>
 
@@ -40,7 +41,7 @@ Timer delay(const Duration& duration,
                  dispatcher,
                  internal::canonicalize(method));
 
-  return Timer::create(duration, dispatch);
+  return Clock::timer(duration, dispatch);
 }
 
 
@@ -87,7 +88,7 @@ Timer delay(const Duration& duration,
                    dispatcher,                                          \
                    internal::canonicalize(method));                     \
                                                                         \
-    return Timer::create(duration, dispatch);                           \
+    return Clock::timer(duration, dispatch);                            \
   }                                                                     \
                                                                         \
   template <typename T,                                                 \

http://git-wip-us.apache.org/repos/asf/mesos/blob/303ee1c3/3rdparty/libprocess/include/process/future.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/future.hpp b/3rdparty/libprocess/include/process/future.hpp
index 3ac1812..68e5f7b 100644
--- a/3rdparty/libprocess/include/process/future.hpp
+++ b/3rdparty/libprocess/include/process/future.hpp
@@ -18,6 +18,7 @@
 #include <boost/type_traits.hpp>
 #endif // __cplusplus < 201103L
 
+#include <process/clock.hpp>
 #include <process/internal.hpp>
 #include <process/latch.hpp>
 #include <process/owned.hpp>
@@ -1491,7 +1492,7 @@ void after(
 {
   CHECK(!future.isPending());
   if (latch->trigger()) {
-    Timer::cancel(timer);
+    Clock::cancel(timer);
     promise->associate(future);
   }
 }
@@ -1554,7 +1555,7 @@ Future<T> Future<T>::after(
   // completed. Note that we do not pass a weak reference for this
   // future as we don't want the future to get cleaned up and then
   // have the timer expire.
-  Timer timer = Timer::create(
+  Timer timer = Clock::timer(
       duration,
       lambda::bind(&internal::expired<T>, f, latch, promise, *this));
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/303ee1c3/3rdparty/libprocess/include/process/timer.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/timer.hpp b/3rdparty/libprocess/include/process/timer.hpp
index e2f5563..e5d71f6 100644
--- a/3rdparty/libprocess/include/process/timer.hpp
+++ b/3rdparty/libprocess/include/process/timer.hpp
@@ -11,19 +11,14 @@
 
 namespace process {
 
-// Timer support!
+// Timer represents a delayed thunk, that can get created (scheduled)
+// and canceled using the Clock.
 
 class Timer
 {
 public:
   Timer() : id(0), pid(process::UPID()), thunk(&abort) {}
 
-  static Timer create(
-      const Duration& duration,
-      const lambda::function<void(void)>& thunk);
-
-  static bool cancel(const Timer& timer);
-
   bool operator == (const Timer& that) const
   {
     return id == that.id;
@@ -50,6 +45,8 @@ public:
   }
 
 private:
+  friend class Clock;
+
   Timer(long _id,
         const Timeout& _t,
         const process::UPID& _pid,

http://git-wip-us.apache.org/repos/asf/mesos/blob/303ee1c3/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 5842705..9ebac08 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -3160,7 +3160,7 @@ Future<Response> ProcessManager::__processes__(const Request&)
 }
 
 
-Timer Timer::create(
+Timer Clock::timer(
     const Duration& duration,
     const lambda::function<void(void)>& thunk)
 {
@@ -3194,7 +3194,7 @@ Timer Timer::create(
 }
 
 
-bool Timer::cancel(const Timer& timer)
+bool Clock::cancel(const Timer& timer)
 {
   bool canceled = false;
   synchronized (timeouts) {


[21/30] mesos git commit: Introduce std::make_shared configure check.

Posted by be...@apache.org.
Introduce std::make_shared configure check.

Review: https://reviews.apache.org/r/27921


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9eda4331
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9eda4331
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9eda4331

Branch: refs/heads/master
Commit: 9eda4331dd23c3646aba1ec710e0dd3190e579ab
Parents: b7f7c98
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Nov 15 17:35:49 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 17:38:21 2014 -0800

----------------------------------------------------------------------
 m4/ax_cxx_compile_stdcxx_11.m4 | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/9eda4331/m4/ax_cxx_compile_stdcxx_11.m4
----------------------------------------------------------------------
diff --git a/m4/ax_cxx_compile_stdcxx_11.m4 b/m4/ax_cxx_compile_stdcxx_11.m4
index 07e20bb..6a859b8 100644
--- a/m4/ax_cxx_compile_stdcxx_11.m4
+++ b/m4/ax_cxx_compile_stdcxx_11.m4
@@ -80,6 +80,8 @@ m4_define([_AX_CXX_COMPILE_STDCXX_11_testbody], [
     p1->bar();
   }
 
+  std::shared_ptr<int> k = std::make_shared<int>(2);
+
   void mutexTest()
   {
     std::mutex _mutex;