You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/11/16 02:38:58 UTC
[01/30] mesos git commit: Abstract clock internals from
ProcessManager::settle.
Repository: mesos
Updated Branches:
refs/heads/master acd656c4e -> 8279b45ed
Abstract clock internals from ProcessManager::settle.
Review: https://reviews.apache.org/r/27498
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/84efa184
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/84efa184
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/84efa184
Branch: refs/heads/master
Commit: 84efa184159ebaa7f5110073358d35773d149f0e
Parents: 788a136
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sun Nov 2 15:03:22 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 16:25:57 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/include/process/clock.hpp | 22 +++-
3rdparty/libprocess/include/process/future.hpp | 22 +++-
3rdparty/libprocess/src/process.cpp | 117 ++++++++++++++------
3 files changed, 121 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/84efa184/3rdparty/libprocess/include/process/clock.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/clock.hpp b/3rdparty/libprocess/include/process/clock.hpp
index 80190ef..d60742a 100644
--- a/3rdparty/libprocess/include/process/clock.hpp
+++ b/3rdparty/libprocess/include/process/clock.hpp
@@ -39,11 +39,27 @@ public:
static void order(ProcessBase* from, ProcessBase* to);
- // When the clock is paused, settle() synchronously ensures that:
+ // When the clock is paused this returns only after
// (1) all expired timers are executed,
- // (2) no Processes are running, and
- // (3) no Processes are ready to run.
+ // (2) no processes are running, and
+ // (3) no processes are ready to run.
+ //
+ // In other words, this function blocks synchronously until no other
+ // execution on any processes will occur unless the clock is
+ // advanced.
+ //
+ // TODO(benh): Move this function elsewhere, for instance, to a
+ // top-level function in the 'process' namespace since it deals with
+ // both processes and the clock.
static void settle();
+
+ // When the clock is paused this returns true if all timers that
+ // expire before the paused time have executed, otherwise false.
+ // Note that if the clock continually gets advanced concurrently
+ // this function may never return true because the "paused" time
+ // will continue to get pushed out farther in the future making more
+ // timers candidates for execution.
+ static bool settled();
};
} // namespace process {
http://git-wip-us.apache.org/repos/asf/mesos/blob/84efa184/3rdparty/libprocess/include/process/future.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/future.hpp b/3rdparty/libprocess/include/process/future.hpp
index 68e5f7b..2e4f9ef 100644
--- a/3rdparty/libprocess/include/process/future.hpp
+++ b/3rdparty/libprocess/include/process/future.hpp
@@ -1121,7 +1121,7 @@ bool Future<T>::hasDiscard() const
namespace internal {
-inline void awaited(const Owned<Latch>& latch)
+inline void awaited(Owned<Latch> latch)
{
latch->trigger();
}
@@ -1132,18 +1132,32 @@ inline void awaited(const Owned<Latch>& latch)
template <typename T>
bool Future<T>::await(const Duration& duration) const
{
- Owned<Latch> latch;
+ // NOTE: We need to preemptively allocate the Latch on the stack
+ // instead of lazily create it in the critical section below because
+ // instantiating a Latch requires creating a new process (at the
+ // time of writing this comment) which might need to do some
+ // synchronization in libprocess which might deadlock if some other
+ // code in libprocess is already holding a lock and then attempts to
+ // do Promise::set (or something similar) that attempts to acquire
+ // the lock that we acquire here. This is an artifact of using
+ // Future/Promise within the implementation of libprocess.
+ //
+ // We mostly only call 'await' in tests so this should not be a
+ // performance concern.
+ Owned<Latch> latch(new Latch());
+
+ bool pending = false;
internal::acquire(&data->lock);
{
if (data->state == PENDING) {
- latch.reset(new Latch());
+ pending = true;
data->onAnyCallbacks.push(lambda::bind(&internal::awaited, latch));
}
}
internal::release(&data->lock);
- if (latch.get() != NULL) {
+ if (pending) {
return latch->await(duration);
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/84efa184/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 9ebac08..e7e5520 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -43,6 +43,7 @@
#include <boost/shared_array.hpp>
+#include <process/check.hpp>
#include <process/clock.hpp>
#include <process/defer.hpp>
#include <process/delay.hpp>
@@ -475,11 +476,6 @@ static queue<lambda::function<void(void)> >* functions =
static map<Time, list<Timer> >* timeouts = new map<Time, list<Timer> >();
static synchronizable(timeouts) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
-// For supporting Clock::settle(), true if timers have been removed
-// from 'timeouts' but may not have been executed yet. Protected by
-// the timeouts lock. This is only used when the clock is paused.
-static bool pending_timers = false;
-
// Flag to indicate whether or to update the timer on async interrupt.
static bool update_timer = false;
@@ -525,6 +521,11 @@ Duration advanced = Duration::zero();
bool paused = false;
+// For supporting Clock::settled(), false if we're not currently
+// settling (or we're not paused), true if we're currently attempting
+// to settle (and we're paused).
+bool settling = false;
+
} // namespace clock {
@@ -596,6 +597,7 @@ void Clock::resume()
if (clock::paused) {
VLOG(2) << "Clock resumed at " << clock::current;
clock::paused = false;
+ clock::settling = false;
clock::currents->clear();
update_timer = true;
ev_async_send(loop, &async_watcher);
@@ -667,6 +669,7 @@ void Clock::update(ProcessBase* process, const Time& time)
void Clock::order(ProcessBase* from, ProcessBase* to)
{
+ VLOG(2) << "Clock of " << to->self() << " being updated to " << from->self();
update(to, now(from));
}
@@ -678,6 +681,29 @@ void Clock::settle()
}
+bool Clock::settled()
+{
+ synchronized (timeouts) {
+ CHECK(clock::paused);
+
+ if (update_timer) {
+ return false;
+ } else if (clock::settling) {
+ VLOG(3) << "Clock still not settled";
+ return false;
+ } else if (timeouts->size() == 0 ||
+ timeouts->begin()->first > clock::current) {
+ VLOG(3) << "Clock is settled";
+ return true;
+ }
+
+ VLOG(3) << "Clock is not settled";
+
+ return false;
+ }
+}
+
+
Try<Time> Time::create(double seconds)
{
Try<Duration> duration = Duration::create(seconds);
@@ -878,9 +904,12 @@ void handle_timeouts(struct ev_loop* loop, ev_timer* _, int revents)
VLOG(3) << "Have timeout(s) at " << timeout;
- // Record that we have pending timers to execute so the
- // Clock::settle() operation can wait until we're done.
- pending_timers = true;
+ // Need to toggle 'settling' so that we don't prematurely say
+ // we're settled until after the timers are executed below,
+ // outside of the critical section.
+ if (clock::paused) {
+ clock::settling = true;
+ }
foreach (const Timer& timer, (*timeouts)[timeout]) {
timedout.push_back(timer);
@@ -943,11 +972,16 @@ void handle_timeouts(struct ev_loop* loop, ev_timer* _, int revents)
timer();
}
- // Mark ourselves as done executing the timers since it's now safe
- // for a call to Clock::settle() to check if there will be any
- // future timeouts reached.
+ // Mark 'settling' as false since there are not any more timeouts
+ // that will expire before the paused time and we've finished
+ // executing expired timers.
synchronized (timeouts) {
- pending_timers = false;
+ if (clock::paused &&
+ (timeouts->size() == 0 ||
+ timeouts->begin()->first > clock::current)) {
+ VLOG(3) << "Clock has settled";
+ clock::settling = false;
+ }
}
}
@@ -2961,7 +2995,15 @@ bool ProcessManager::wait(const UPID& pid)
list<ProcessBase*>::iterator it =
find(runq.begin(), runq.end(), process);
if (it != runq.end()) {
+ // Found it! Remove it from the run queue since we'll be
+ // donating our thread and also increment 'running' before
+ // leaving this 'runq' protected critical section so that
+ // everyone that is waiting for the processes to settle
+ // continue to wait (otherwise they could see nothing in
+ // 'runq' and 'running' equal to 0 between when we exit
+ // this critical section and increment 'running').
runq.erase(it);
+ __sync_fetch_and_add(&running, 1);
} else {
// Another thread has resumed the process ...
process = NULL;
@@ -2977,7 +3019,6 @@ bool ProcessManager::wait(const UPID& pid)
if (process != NULL) {
VLOG(2) << "Donating thread to " << process->pid << " while waiting";
ProcessBase* donator = __process__;
- __sync_fetch_and_add(&running, 1);
process_manager->resume(process);
__process__ = donator;
}
@@ -3046,30 +3087,39 @@ void ProcessManager::settle()
{
bool done = true;
do {
+ // While refactoring in order to isolate libev behind abstractions
+ // it became evident that this os::sleep is vital for tests to
+ // pass. In particular, there are certain tests that assume too
+ // much before they attempt to do a settle. One such example is
+ // tests doing http::get followed by Clock::settle, where they
+ // expect the http::get will have properly enqueued a process on
+ // the run queue but http::get is just sending bytes on a
+ // socket. Without sleeping at the beginning of this function we
+ // can get unlucky and appear settled when in actuallity the
+ // kernel just hasn't copied the bytes to a socket or we haven't
+ // yet read the bytes and enqueued an event on a process (and the
+ // process on the run queue).
os::sleep(Milliseconds(10));
- done = true;
- // Hopefully this is the only place we acquire both these locks.
- synchronized (runq) {
- synchronized (timeouts) {
- CHECK(Clock::paused()); // Since another thread could resume the clock!
- if (!runq.empty()) {
- done = false;
- }
+ done = true; // Assume to start that we are settled.
- __sync_synchronize(); // Read barrier for 'running'.
- if (running > 0) {
- done = false;
- }
+ synchronized (runq) {
+ if (!runq.empty()) {
+ done = false;
+ continue;
+ }
- if (timeouts->size() > 0 &&
- timeouts->begin()->first <= clock::current) {
- done = false;
- }
+ // Read barrier for 'running'.
+ __sync_synchronize();
- if (pending_timers) {
- done = false;
- }
+ if (running > 0) {
+ done = false;
+ continue;
+ }
+
+ if (!Clock::settled()) {
+ done = false;
+ continue;
}
}
} while (!done);
@@ -3173,7 +3223,8 @@ Timer Clock::timer(
Timer timer(__sync_fetch_and_add(&id, 1), timeout, pid, thunk);
- VLOG(3) << "Created a timer for " << timeout.time();
+ VLOG(3) << "Created a timer for " << pid << " in " << stringify(duration)
+ << " in the future (" << timeout.time() << ")";
// Add the timer.
synchronized (timeouts) {
[02/30] mesos git commit: Updates to Mesos for Timer::create/cancel
-> Clock::timer/cancel.
Posted by be...@apache.org.
Updates to Mesos for Timer::create/cancel -> Clock::timer/cancel.
Review: https://reviews.apache.org/r/27497
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/788a136f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/788a136f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/788a136f
Branch: refs/heads/master
Commit: 788a136f3a43a32ce64f7b2ef35a1005f2528dbd
Parents: 303ee1c
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sun Nov 2 20:23:01 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 16:25:57 2014 -0800
----------------------------------------------------------------------
src/launcher/executor.cpp | 2 +-
src/log/catchup.cpp | 2 +-
src/master/master.cpp | 4 ++--
src/slave/gc.cpp | 2 +-
src/slave/slave.cpp | 6 +++---
src/zookeeper/group.cpp | 4 ++--
6 files changed, 10 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/788a136f/src/launcher/executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/executor.cpp b/src/launcher/executor.cpp
index cbc8750..6175bf5 100644
--- a/src/launcher/executor.cpp
+++ b/src/launcher/executor.cpp
@@ -378,7 +378,7 @@ private:
TaskState state;
string message;
- Timer::cancel(escalationTimer);
+ Clock::cancel(escalationTimer);
if (!status_.isReady()) {
state = TASK_FAILED;
http://git-wip-us.apache.org/repos/asf/mesos/blob/788a136f/src/log/catchup.cpp
----------------------------------------------------------------------
diff --git a/src/log/catchup.cpp b/src/log/catchup.cpp
index 6315a85..f7afc38 100644
--- a/src/log/catchup.cpp
+++ b/src/log/catchup.cpp
@@ -238,7 +238,7 @@ private:
.onFailed(defer(self(), &Self::failed))
.onReady(defer(self(), &Self::succeeded));
- Timer::create(timeout, lambda::bind(&Self::timedout, catching));
+ Clock::timer(timeout, lambda::bind(&Self::timedout, catching));
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/788a136f/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 83c2f8a..b5fa8d1 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -773,7 +773,7 @@ void Master::finalize()
// TODO(vinod): This seems to be a bug in libprocess or the
// testing infrastructure.
if (slaves.recoveredTimer.isSome()) {
- Timer::cancel(slaves.recoveredTimer.get());
+ Clock::cancel(slaves.recoveredTimer.get());
}
terminate(whitelistWatcher);
@@ -4829,7 +4829,7 @@ void Master::removeOffer(Offer* offer, bool rescind)
// Remove and cancel offer removal timers. Canceling the Timers is
// only done to avoid having too many active Timers in libprocess.
if (offerTimers.contains(offer->id())) {
- Timer::cancel(offerTimers[offer->id()]);
+ Clock::cancel(offerTimers[offer->id()]);
offerTimers.erase(offer->id());
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/788a136f/src/slave/gc.cpp
----------------------------------------------------------------------
diff --git a/src/slave/gc.cpp b/src/slave/gc.cpp
index 73136dd..6042277 100644
--- a/src/slave/gc.cpp
+++ b/src/slave/gc.cpp
@@ -113,7 +113,7 @@ bool GarbageCollectorProcess::unschedule(const string& path)
// existing timer.
void GarbageCollectorProcess::reset()
{
- Timer::cancel(timer); // Cancel the existing timer, if any.
+ Clock::cancel(timer); // Cancel the existing timer, if any.
if (!paths.empty()) {
Timeout removalTime = (*paths.begin()).first; // Get the first entry.
http://git-wip-us.apache.org/repos/asf/mesos/blob/788a136f/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 275081c..06b2e18 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -795,7 +795,7 @@ void Slave::registered(const UPID& from, const SlaveID& slaveId)
// If we don't get a ping from the master, trigger a
// re-registration. This needs to be done once registered,
// in case we never receive an initial ping.
- Timer::cancel(pingTimer);
+ Clock::cancel(pingTimer);
pingTimer = delay(
MASTER_PING_TIMEOUT(),
@@ -2577,7 +2577,7 @@ void Slave::pingOld(const UPID& from, const string& body)
// longer considers the slave to be registered, so it is
// essential for the slave to attempt a re-registration
// when this occurs.
- Timer::cancel(pingTimer);
+ Clock::cancel(pingTimer);
pingTimer = delay(
MASTER_PING_TIMEOUT(),
@@ -2609,7 +2609,7 @@ void Slave::ping(const UPID& from, bool connected)
// longer considers the slave to be registered, so it is
// essential for the slave to attempt a re-registration
// when this occurs.
- Timer::cancel(pingTimer);
+ Clock::cancel(pingTimer);
pingTimer = delay(
MASTER_PING_TIMEOUT(),
http://git-wip-us.apache.org/repos/asf/mesos/blob/788a136f/src/zookeeper/group.cpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/group.cpp b/src/zookeeper/group.cpp
index 7dee0a1..173caa8 100644
--- a/src/zookeeper/group.cpp
+++ b/src/zookeeper/group.cpp
@@ -332,7 +332,7 @@ void GroupProcess::connected(int64_t sessionId, bool reconnect)
// Cancel and cleanup the reconnect timer (if necessary).
if (timer.isSome()) {
- Timer::cancel(timer.get());
+ Clock::cancel(timer.get());
timer = None();
}
@@ -476,7 +476,7 @@ void GroupProcess::expired(int64_t sessionId)
// Cancel and cleanup the reconnect timer (if necessary).
if (timer.isSome()) {
- Timer::cancel(timer.get());
+ Clock::cancel(timer.get());
timer = None();
}
[11/30] mesos git commit: Started moving libev specific functionality
out of process.cpp.
Posted by be...@apache.org.
Started moving libev specific functionality out of process.cpp.
Review: https://reviews.apache.org/r/27504
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f78ae663
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f78ae663
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f78ae663
Branch: refs/heads/master
Commit: f78ae6635ce0aae89b1b9bd3dd1eae02c1da0936
Parents: 0e19796
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sun Nov 2 18:19:23 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 16:25:58 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/Makefile.am | 2 +
3rdparty/libprocess/src/libev.cpp | 21 ++++++++++
3rdparty/libprocess/src/libev.hpp | 70 ++++++++++++++++++++++++++++++++
3rdparty/libprocess/src/process.cpp | 64 ++++-------------------------
4 files changed, 100 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/f78ae663/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index 0008e68..2de9989 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -38,6 +38,8 @@ libprocess_la_SOURCES = \
src/help.cpp \
src/http.cpp \
src/latch.cpp \
+ src/libev.hpp \
+ src/libev.cpp \
src/metrics/metrics.cpp \
src/pid.cpp \
src/process.cpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/f78ae663/3rdparty/libprocess/src/libev.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libev.cpp b/3rdparty/libprocess/src/libev.cpp
new file mode 100644
index 0000000..efc89d8
--- /dev/null
+++ b/3rdparty/libprocess/src/libev.cpp
@@ -0,0 +1,21 @@
+#include <ev.h>
+
+#include <queue>
+
+#include <stout/lambda.hpp>
+
+#include "libev.hpp"
+
+namespace process {
+
+// Defines the initial values for all of the declarations made in
+// libev.hpp (since these need to live in the static data space).
+struct ev_loop* loop = NULL;
+ev_async async_watcher;
+ev_io server_watcher;
+std::queue<ev_io*>* watchers = new std::queue<ev_io*>();
+synchronizable(watchers);
+std::queue<lambda::function<void(void)>>* functions =
+ new std::queue<lambda::function<void(void)>>();
+
+} // namespace process {
http://git-wip-us.apache.org/repos/asf/mesos/blob/f78ae663/3rdparty/libprocess/src/libev.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libev.hpp b/3rdparty/libprocess/src/libev.hpp
new file mode 100644
index 0000000..bac8b6a
--- /dev/null
+++ b/3rdparty/libprocess/src/libev.hpp
@@ -0,0 +1,70 @@
+#include <ev.h>
+
+#include <queue>
+
+#include <process/future.hpp>
+#include <process/owned.hpp>
+
+#include <stout/lambda.hpp>
+
+#include "synchronized.hpp"
+
+namespace process {
+
+// Event loop.
+extern struct ev_loop* loop;
+
+// Asynchronous watcher for interrupting loop to specifically deal
+// with IO watchers and functions (via run_in_event_loop).
+extern ev_async async_watcher;
+
+// Server watcher for accepting connections.
+extern ev_io server_watcher;
+
+// Queue of I/O watchers to be asynchronously added to the event loop
+// (protected by 'watchers' below).
+// TODO(benh): Replace this queue with functions that we put in
+// 'functions' below that perform the ev_io_start themselves.
+extern std::queue<ev_io*>* watchers;
+extern synchronizable(watchers);
+
+// Queue of functions to be invoked asynchronously within the vent
+// loop (protected by 'watchers' above).
+extern std::queue<lambda::function<void(void)>>* functions;
+
+
+// Wrapper around function we want to run in the event loop.
+template <typename T>
+void _run_in_event_loop(
+ const lambda::function<Future<T>(void)>& f,
+ const Owned<Promise<T>>& promise)
+{
+ // Don't bother running the function if the future has been discarded.
+ if (promise->future().hasDiscard()) {
+ promise->discard();
+ } else {
+ promise->set(f());
+ }
+}
+
+
+// Helper for running a function in the event loop.
+template <typename T>
+Future<T> run_in_event_loop(const lambda::function<Future<T>(void)>& f)
+{
+ Owned<Promise<T>> promise(new Promise<T>());
+
+ Future<T> future = promise->future();
+
+ // Enqueue the function.
+ synchronized (watchers) {
+ functions->push(lambda::bind(&_run_in_event_loop<T>, f, promise));
+ }
+
+ // Interrupt the loop.
+ ev_async_send(loop, &async_watcher);
+
+ return future;
+}
+
+} // namespace process {
http://git-wip-us.apache.org/repos/asf/mesos/blob/f78ae663/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index bac4200..d96d881 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -83,6 +83,7 @@
#include "decoder.hpp"
#include "encoder.hpp"
#include "gate.hpp"
+#include "libev.hpp"
#include "process_reference.hpp"
#include "synchronized.hpp"
@@ -446,28 +447,6 @@ static SocketManager* socket_manager = NULL;
// Active ProcessManager (eventually will probably be thread-local).
static ProcessManager* process_manager = NULL;
-// Event loop.
-struct ev_loop* loop = NULL;
-
-// Asynchronous watcher for interrupting loop to specifically deal
-// with IO watchers and functions (via run_in_event_loop).
-static ev_async async_watcher;
-
-// Server watcher for accepting connections.
-static ev_io server_watcher;
-
-// Queue of I/O watchers to be asynchronously added to the event loop
-// (protected by 'watchers' below).
-// TODO(benh): Replace this queue with functions that we put in
-// 'functions' below that perform the ev_io_start themselves.
-static queue<ev_io*>* watchers = new queue<ev_io*>();
-static synchronizable(watchers) = SYNCHRONIZED_INITIALIZER;
-
-// Queue of functions to be invoked asynchronously within the vent
-// loop (protected by 'watchers' below).
-static queue<lambda::function<void(void)> >* functions =
- new queue<lambda::function<void(void)> >();
-
// Scheduling gate that threads wait at when there is nothing to run.
static Gate* gate = new Gate();
@@ -593,40 +572,6 @@ static Message* parse(Request* request)
return message;
}
-// Wrapper around function we want to run in the event loop.
-template <typename T>
-void _run_in_event_loop(
- const lambda::function<Future<T>(void)>& f,
- const Owned<Promise<T> >& promise)
-{
- // Don't bother running the function if the future has been discarded.
- if (promise->future().hasDiscard()) {
- promise->discard();
- } else {
- promise->set(f());
- }
-}
-
-
-// Helper for running a function in the event loop.
-template <typename T>
-Future<T> run_in_event_loop(const lambda::function<Future<T>(void)>& f)
-{
- Owned<Promise<T> > promise(new Promise<T>());
-
- Future<T> future = promise->future();
-
- // Enqueue the function.
- synchronized (watchers) {
- functions->push(lambda::bind(&_run_in_event_loop<T>, f, promise));
- }
-
- // Interrupt the loop.
- ev_async_send(loop, &async_watcher);
-
- return future;
-}
-
void handle_async(struct ev_loop* loop, ev_async* _, int revents)
{
@@ -1280,7 +1225,12 @@ void initialize(const string& delegate)
PLOG(FATAL) << "Failed to initialize, listen";
}
- // Setup event loop.
+ // Initialize libev.
+ //
+ // TODO(benh): Eventually move this all out of process.cpp after
+ // more is disentangled.
+ synchronizer(watchers) = SYNCHRONIZED_INITIALIZER;
+
#ifdef __sun__
loop = ev_default_loop(EVBACKEND_POLL | EVBACKEND_SELECT);
#else
[29/30] mesos git commit: Virtualize backup(length) and remaining()
in Encoder interface.
Posted by be...@apache.org.
Virtualize backup(length) and remaining() in Encoder interface.
These are top level concepts implemented by both data and file encoders.
Review: https://reviews.apache.org/r/27963
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3564c446
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3564c446
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3564c446
Branch: refs/heads/master
Commit: 3564c44642cda195c60e8f4914c59b49dbdff576
Parents: 4a79a69
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Nov 15 16:48:15 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 17:38:22 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/src/encoder.hpp | 4 ++++
1 file changed, 4 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/3564c446/3rdparty/libprocess/src/encoder.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/encoder.hpp b/3rdparty/libprocess/src/encoder.hpp
index 8040d23..7afde0e 100644
--- a/3rdparty/libprocess/src/encoder.hpp
+++ b/3rdparty/libprocess/src/encoder.hpp
@@ -49,6 +49,10 @@ public:
virtual Kind kind() const = 0;
+ virtual void backup(size_t length) = 0;
+
+ virtual size_t remaining() const = 0;
+
Socket socket() const
{
return s;
[28/30] mesos git commit: Introduce Kind enumeration in Encoder.
Posted by be...@apache.org.
Introduce Kind enumeration in Encoder.
Introduce enumeration in Encoder to distinguish between data and file
encoders rather than using a function pointer.
Review: https://reviews.apache.org/r/27962
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4a79a690
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4a79a690
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4a79a690
Branch: refs/heads/master
Commit: 4a79a690d02f0910beba4125f0d0c4f0c2f01f46
Parents: 541e8e5
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Nov 15 16:47:49 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 17:38:22 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/src/encoder.hpp | 17 +++++++++++++++++
1 file changed, 17 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/4a79a690/3rdparty/libprocess/src/encoder.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/encoder.hpp b/3rdparty/libprocess/src/encoder.hpp
index 800f324..8040d23 100644
--- a/3rdparty/libprocess/src/encoder.hpp
+++ b/3rdparty/libprocess/src/encoder.hpp
@@ -37,11 +37,18 @@ typedef void (*Sender)(Encoder*);
class Encoder
{
public:
+ enum Kind {
+ DATA,
+ FILE
+ };
+
explicit Encoder(const Socket& _s) : s(_s) {}
virtual ~Encoder() {}
virtual Sender sender() = 0;
+ virtual Kind kind() const = 0;
+
Socket socket() const
{
return s;
@@ -65,6 +72,11 @@ public:
return send_data;
}
+ virtual Kind kind() const
+ {
+ return Encoder::DATA;
+ }
+
virtual const char* next(size_t* length)
{
size_t temp = index;
@@ -243,6 +255,11 @@ public:
return send_file;
}
+ virtual Kind kind() const
+ {
+ return Encoder::FILE;
+ }
+
virtual int next(off_t* offset, size_t* length)
{
off_t temp = index;
[19/30] mesos git commit: Use Socket::read for ignore_data.
Posted by be...@apache.org.
Use Socket::read for ignore_data.
Review: https://reviews.apache.org/r/27961
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/541e8e5b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/541e8e5b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/541e8e5b
Branch: refs/heads/master
Commit: 541e8e5b2b8353c62fbe42d6e63ca15809b2fdb7
Parents: 383f547
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Nov 15 16:47:18 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 17:38:21 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/src/process.cpp | 89 +++++++++++++++-----------------
1 file changed, 43 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/541e8e5b/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index c37d522..63f9df4 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -589,48 +589,6 @@ void handle_async(struct ev_loop* loop, ev_async* _, int revents)
}
-// A variant of 'recv_data' that doesn't do anything with the
-// data. Used by sockets created via SocketManager::link as well as
-// SocketManager::send(Message) where we don't care about the data
-// received we mostly just want to know when the socket has been
-// closed.
-void ignore_data(Socket* socket, int s)
-{
- while (true) {
- const ssize_t size = 80 * 1024;
- ssize_t length = 0;
-
- char data[size];
-
- length = recv(s, data, size, 0);
-
- if (length < 0 && (errno == EINTR)) {
- // Interrupted, try again now.
- continue;
- } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
- // Might block, try again later.
- io::poll(s, io::READ)
- .onAny(lambda::bind(&ignore_data, socket, s));
- break;
- } else if (length <= 0) {
- // Socket error or closed.
- if (length < 0) {
- const char* error = strerror(errno);
- VLOG(1) << "Socket error while receiving: " << error;
- } else {
- VLOG(2) << "Socket closed while receiving";
- }
- socket_manager->close(s);
- delete socket;
- break;
- } else {
- VLOG(2) << "Ignoring " << length << " bytes of data received "
- << "on socket used only for sending";
- }
- }
-}
-
-
void send_data(Encoder* e)
{
DataEncoder* encoder = CHECK_NOTNULL(dynamic_cast<DataEncoder*>(e));
@@ -1528,6 +1486,31 @@ Socket SocketManager::accepted(int s)
namespace internal {
+void ignore_read_data(
+ const Future<size_t>& length,
+ Socket* socket,
+ char* data,
+ size_t size)
+{
+ if (length.isDiscarded() || length.isFailed()) {
+ socket_manager->close(*socket);
+ delete[] data;
+ delete socket;
+ return;
+ }
+
+ if (length.get() == 0) {
+ socket_manager->close(*socket);
+ delete[] data;
+ delete socket;
+ return;
+ }
+
+ socket->read(data, size)
+ .onAny(lambda::bind(&ignore_read_data, lambda::_1, socket, data, size));
+}
+
+
void link_connect(const Future<Socket>& socket)
{
if (socket.isDiscarded() || socket.isFailed()) {
@@ -1538,8 +1521,15 @@ void link_connect(const Future<Socket>& socket)
return;
}
- io::poll(socket.get(), io::READ)
- .onAny(lambda::bind(&ignore_data, new Socket(socket.get()), socket.get()));
+ size_t size = 80 * 1024;
+ char* data = new char[size];
+ socket.get().read(data, size)
+ .onAny(lambda::bind(
+ &ignore_read_data,
+ lambda::_1,
+ new Socket(socket.get()),
+ data,
+ size));
}
} // namespace internal {
@@ -1677,8 +1667,15 @@ void send_connect(const Future<Socket>& socket, Message* message)
// Read and ignore data from this socket. Note that we don't
// expect to receive anything other than HTTP '202 Accepted'
// responses which we just ignore.
- io::poll(socket.get(), io::READ)
- .onAny(lambda::bind(&ignore_data, new Socket(socket.get()), socket.get()));
+ size_t size = 80 * 1024;
+ char* data = new char[size];
+ socket.get().read(data, size)
+ .onAny(lambda::bind(
+ &ignore_read_data,
+ lambda::_1,
+ new Socket(socket.get()),
+ data,
+ size));
// Start polling in order to send data.
io::poll(socket.get(), io::WRITE)
[07/30] mesos git commit: Removed redundant synchronization and
conditional check.
Posted by be...@apache.org.
Removed redundant synchronization and conditional check.
Review: https://reviews.apache.org/r/27500
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6df6d02c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6df6d02c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6df6d02c
Branch: refs/heads/master
Commit: 6df6d02c2669575037fb0fb5bbbb4d6f1ccdccc1
Parents: b1f2bb9
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sun Nov 2 15:57:21 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 16:25:58 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/src/process.cpp | 6 +-----
1 file changed, 1 insertion(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/6df6d02c/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 9551d99..0995a9c 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -2444,11 +2444,7 @@ void SocketManager::exited(ProcessBase* process)
if (linkee == pid) {
foreach (ProcessBase* linker, processes) {
CHECK(linker != process) << "Process linked with itself";
- synchronized (timeouts) {
- if (Clock::paused()) {
- Clock::update(linker, time);
- }
- }
+ Clock::update(linker, time);
linker->enqueue(new ExitedEvent(linkee));
}
}
[08/30] mesos git commit: Introduced a callback for timer expiration.
Posted by be...@apache.org.
Introduced a callback for timer expiration.
This enables seperating the Clock specific functionality from the
ProcessManager functionality so that the Clock implementation can
ultimately be completely isolated.
Review: https://reviews.apache.org/r/27499
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b1f2bb9b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b1f2bb9b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b1f2bb9b
Branch: refs/heads/master
Commit: b1f2bb9b7eabf74be19bcd642e94842a3a476f30
Parents: 84efa18
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sun Nov 2 15:50:36 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 16:25:58 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/include/process/clock.hpp | 10 ++++
3rdparty/libprocess/src/process.cpp | 63 +++++++++++++---------
2 files changed, 48 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/b1f2bb9b/3rdparty/libprocess/include/process/clock.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/clock.hpp b/3rdparty/libprocess/include/process/clock.hpp
index d60742a..ae7d0fb 100644
--- a/3rdparty/libprocess/include/process/clock.hpp
+++ b/3rdparty/libprocess/include/process/clock.hpp
@@ -1,6 +1,8 @@
#ifndef __PROCESS_CLOCK_HPP__
#define __PROCESS_CLOCK_HPP__
+#include <list>
+
#include <process/time.hpp>
#include <process/timer.hpp>
@@ -17,6 +19,14 @@ class Timer;
class Clock
{
public:
+ // Initialize the clock with the specified callback that will be
+ // invoked whenever a batch of timers has expired.
+ //
+ // TODO(benh): Introduce a "channel" or listener pattern for getting
+ // the expired Timers rather than passing in a callback. This might
+ // mean we don't need 'initialize' or 'shutdown'.
+ static void initialize(lambda::function<void(std::list<Timer>&&)>&& callback);
+
static Time now();
static Time now(ProcessBase* process);
http://git-wip-us.apache.org/repos/asf/mesos/blob/b1f2bb9b/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index e7e5520..9551d99 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -526,9 +526,18 @@ bool paused = false;
// to settle (and we're paused).
bool settling = false;
+// Lambda function to invoke when timers have expired.
+lambda::function<void(list<Timer>&&)> callback;
+
} // namespace clock {
+void Clock::initialize(lambda::function<void(list<Timer>&&)>&& callback)
+{
+ clock::callback = callback;
+}
+
+
Time Clock::now()
{
return now(__process__);
@@ -890,7 +899,7 @@ void handle_async(struct ev_loop* loop, ev_async* _, int revents)
void handle_timeouts(struct ev_loop* loop, ev_timer* _, int revents)
{
- list<Timer> timedout;
+ list<Timer> timers;
synchronized (timeouts) {
Time now = Clock::now();
@@ -912,7 +921,7 @@ void handle_timeouts(struct ev_loop* loop, ev_timer* _, int revents)
}
foreach (const Timer& timer, (*timeouts)[timeout]) {
- timedout.push_back(timer);
+ timers.push_back(timer);
}
}
@@ -948,29 +957,7 @@ void handle_timeouts(struct ev_loop* loop, ev_timer* _, int revents)
update_timer = false; // Since we might have a queued update_timer.
}
- // Update current time of process (if it's present/valid). It might
- // be necessary to actually add some more synchronization around
- // this so that, for example, pausing and resuming the clock doesn't
- // cause some processes to get thier current times updated and
- // others not. Since ProcessManager::use acquires the 'processes'
- // lock we had to move this out of the synchronized (timeouts) above
- // since there was a deadlock with acquring 'processes' then
- // 'timeouts' (reverse order) in ProcessManager::cleanup. Note that
- // current time may be greater than the timeout if a local message
- // was received (and happens-before kicks in).
- if (Clock::paused()) {
- foreach (const Timer& timer, timedout) {
- if (ProcessReference process = process_manager->use(timer.creator())) {
- Clock::update(process, timer.timeout().time());
- }
- }
- }
-
- // Invoke the timers that timed out (TODO(benh): Do this
- // asynchronously so that we don't tie up the event thread!).
- foreach (const Timer& timer, timedout) {
- timer();
- }
+ clock::callback(std::move(timers));
// Mark 'settling' as false since there are not any more timeouts
// that will expire before the paused time and we've finished
@@ -1412,6 +1399,27 @@ void* schedule(void* arg)
}
+void timedout(list<Timer>&& timers)
+{
+ // Update current time of process (if it's present/valid). Note that
+ // current time may be greater than the timeout if a local message
+ // was received (and happens-before kicks in).
+ if (Clock::paused()) {
+ foreach (const Timer& timer, timers) {
+ if (ProcessReference process = process_manager->use(timer.creator())) {
+ Clock::update(process, timer.timeout().time());
+ }
+ }
+ }
+
+ // Invoke the timers that timed out (TODO(benh): Do this
+ // asynchronously so that we don't tie up the event thread!).
+ foreach (const Timer& timer, timers) {
+ timer();
+ }
+}
+
+
// We might find value in catching terminating signals at some point.
// However, for now, adding signal handlers freely is not allowed
// because they will clash with Java and Python virtual machines and
@@ -1484,6 +1492,8 @@ void initialize(const string& delegate)
process_manager = new ProcessManager(delegate);
socket_manager = new SocketManager();
+ Clock::initialize(lambda::bind(&timedout, lambda::_1));
+
// Setup processing threads.
// We create no fewer than 8 threads because some tests require
// more worker threads than 'sysconf(_SC_NPROCESSORS_ONLN)' on
@@ -1701,6 +1711,9 @@ void initialize(const string& delegate)
void finalize()
{
delete process_manager;
+
+ // TODO(benh): Finialize/shutdown Clock so that it doesn't attempt
+ // to dereference 'process_manager' in the 'timedout' callback.
}
[13/30] mesos git commit: Moved event loop specific polling into
poll.cpp.
Posted by be...@apache.org.
Moved event loop specific polling into poll.cpp.
Review: https://reviews.apache.org/r/27505
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/413ce94f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/413ce94f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/413ce94f
Branch: refs/heads/master
Commit: 413ce94f8d74b8c29657eef7dbc2f6ade4143bc7
Parents: f78ae66
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sun Nov 2 18:22:15 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 16:25:58 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/Makefile.am | 1 +
3rdparty/libprocess/src/poll.cpp | 129 +++++++++++++++++++++++++++++++
3rdparty/libprocess/src/process.cpp | 112 ---------------------------
3 files changed, 130 insertions(+), 112 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/413ce94f/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index 2de9989..41c3bd1 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -42,6 +42,7 @@ libprocess_la_SOURCES = \
src/libev.cpp \
src/metrics/metrics.cpp \
src/pid.cpp \
+ src/poll.cpp \
src/process.cpp \
src/process_reference.hpp \
src/reap.cpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/413ce94f/3rdparty/libprocess/src/poll.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/poll.cpp b/3rdparty/libprocess/src/poll.cpp
new file mode 100644
index 0000000..324e4dd
--- /dev/null
+++ b/3rdparty/libprocess/src/poll.cpp
@@ -0,0 +1,129 @@
+#include <ev.h>
+
+#include <process/future.hpp>
+#include <process/process.hpp> // For process::initialize.
+
+#include <stout/lambda.hpp>
+#include <stout/memory.hpp>
+
+#include "libev.hpp"
+
+namespace process {
+
+// Data necessary for polling so we can discard polling and actually
+// stop it in the event loop.
+struct Poll
+{
+ Poll()
+ {
+ // Need to explicitly instantiate the watchers.
+ watcher.io.reset(new ev_io());
+ watcher.async.reset(new ev_async());
+ }
+
+ // An I/O watcher for checking for readability or writeability and
+ // an async watcher for being able to discard the polling.
+ struct {
+ memory::shared_ptr<ev_io> io;
+ memory::shared_ptr<ev_async> async;
+ } watcher;
+
+ Promise<short> promise;
+};
+
+
+// Event loop callback when I/O is ready on polling file descriptor.
+void polled(struct ev_loop* loop, ev_io* watcher, int revents)
+{
+ Poll* poll = (Poll*) watcher->data;
+
+ ev_io_stop(loop, poll->watcher.io.get());
+
+ // Stop the async watcher (also clears if pending so 'discard_poll'
+ // will not get invoked and we can delete 'poll' here).
+ ev_async_stop(loop, poll->watcher.async.get());
+
+ poll->promise.set(revents);
+
+ delete poll;
+}
+
+
+// Event loop callback when future associated with polling file
+// descriptor has been discarded.
+void discard_poll(struct ev_loop* loop, ev_async* watcher, int revents)
+{
+ Poll* poll = (Poll*) watcher->data;
+
+ // Check and see if we have a pending 'polled' callback and if so
+ // let it "win".
+ if (ev_is_pending(poll->watcher.io.get())) {
+ return;
+ }
+
+ ev_async_stop(loop, poll->watcher.async.get());
+
+ // Stop the I/O watcher (but note we check if pending above) so it
+ // won't get invoked and we can delete 'poll' here.
+ ev_io_stop(loop, poll->watcher.io.get());
+
+ poll->promise.discard();
+
+ delete poll;
+}
+
+
+namespace io {
+namespace internal {
+
+// Helper/continuation of 'poll' on future discard.
+void _poll(const memory::shared_ptr<ev_async>& async)
+{
+ ev_async_send(loop, async.get());
+}
+
+
+Future<short> poll(int fd, short events)
+{
+ Poll* poll = new Poll();
+
+ // Have the watchers data point back to the struct.
+ poll->watcher.async->data = poll;
+ poll->watcher.io->data = poll;
+
+ // Get a copy of the future to avoid any races with the event loop.
+ Future<short> future = poll->promise.future();
+
+ // Initialize and start the async watcher.
+ ev_async_init(poll->watcher.async.get(), discard_poll);
+ ev_async_start(loop, poll->watcher.async.get());
+
+ // Make sure we stop polling if a discard occurs on our future.
+ // Note that it's possible that we'll invoke '_poll' when someone
+ // does a discard even after the polling has already completed, but
+ // in this case while we will interrupt the event loop since the
+ // async watcher has already been stopped we won't cause
+ // 'discard_poll' to get invoked.
+ future.onDiscard(lambda::bind(&_poll, poll->watcher.async));
+
+ // Initialize and start the I/O watcher.
+ ev_io_init(poll->watcher.io.get(), polled, fd, events);
+ ev_io_start(loop, poll->watcher.io.get());
+
+ return future;
+}
+
+} // namespace internal {
+
+
+Future<short> poll(int fd, short events)
+{
+ process::initialize();
+
+ // TODO(benh): Check if the file descriptor is non-blocking?
+
+ return run_in_event_loop<short>(lambda::bind(&internal::poll, fd, events));
+}
+
+} // namespace io {
+} // namespace process {
http://git-wip-us.apache.org/repos/asf/mesos/blob/413ce94f/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index d96d881..1fc8874 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -927,69 +927,6 @@ void accept(struct ev_loop* loop, ev_io* watcher, int revents)
}
-// Data necessary for polling so we can discard polling and actually
-// stop it in the event loop.
-struct Poll
-{
- Poll()
- {
- // Need to explicitly instantiate the watchers.
- watcher.io.reset(new ev_io());
- watcher.async.reset(new ev_async());
- }
-
- // An I/O watcher for checking for readability or writeability and
- // an async watcher for being able to discard the polling.
- struct {
- memory::shared_ptr<ev_io> io;
- memory::shared_ptr<ev_async> async;
- } watcher;
-
- Promise<short> promise;
-};
-
-
-// Event loop callback when I/O is ready on polling file descriptor.
-void polled(struct ev_loop* loop, ev_io* watcher, int revents)
-{
- Poll* poll = (Poll*) watcher->data;
-
- ev_io_stop(loop, poll->watcher.io.get());
-
- // Stop the async watcher (also clears if pending so 'discard_poll'
- // will not get invoked and we can delete 'poll' here).
- ev_async_stop(loop, poll->watcher.async.get());
-
- poll->promise.set(revents);
-
- delete poll;
-}
-
-
-// Event loop callback when future associated with polling file
-// descriptor has been discarded.
-void discard_poll(struct ev_loop* loop, ev_async* watcher, int revents)
-{
- Poll* poll = (Poll*) watcher->data;
-
- // Check and see if we have a pending 'polled' callback and if so
- // let it "win".
- if (ev_is_pending(poll->watcher.io.get())) {
- return;
- }
-
- ev_async_stop(loop, poll->watcher.async.get());
-
- // Stop the I/O watcher (but note we check if pending above) so it
- // won't get invoked and we can delete 'poll' here.
- ev_io_stop(loop, poll->watcher.io.get());
-
- poll->promise.discard();
-
- delete poll;
-}
-
-
void* serve(void* arg)
{
ev_loop(((struct ev_loop*) arg), 0);
@@ -3179,47 +3116,8 @@ void post(const UPID& from,
namespace io {
-
namespace internal {
-// Helper/continuation of 'poll' on future discard.
-void _poll(const memory::shared_ptr<ev_async>& async)
-{
- ev_async_send(loop, async.get());
-}
-
-
-Future<short> poll(int fd, short events)
-{
- Poll* poll = new Poll();
-
- // Have the watchers data point back to the struct.
- poll->watcher.async->data = poll;
- poll->watcher.io->data = poll;
-
- // Get a copy of the future to avoid any races with the event loop.
- Future<short> future = poll->promise.future();
-
- // Initialize and start the async watcher.
- ev_async_init(poll->watcher.async.get(), discard_poll);
- ev_async_start(loop, poll->watcher.async.get());
-
- // Make sure we stop polling if a discard occurs on our future.
- // Note that it's possible that we'll invoke '_poll' when someone
- // does a discard even after the polling has already completed, but
- // in this case while we will interrupt the event loop since the
- // async watcher has already been stopped we won't cause
- // 'discard_poll' to get invoked.
- future.onDiscard(lambda::bind(&_poll, poll->watcher.async));
-
- // Initialize and start the I/O watcher.
- ev_io_init(poll->watcher.io.get(), polled, fd, events);
- ev_io_start(loop, poll->watcher.io.get());
-
- return future;
-}
-
-
void read(
int fd,
void* data,
@@ -3366,16 +3264,6 @@ void write(
} // namespace internal {
-Future<short> poll(int fd, short events)
-{
- process::initialize();
-
- // TODO(benh): Check if the file descriptor is non-blocking?
-
- return run_in_event_loop<short>(lambda::bind(&internal::poll, fd, events));
-}
-
-
Future<size_t> read(int fd, void* data, size_t size)
{
process::initialize();
[15/30] mesos git commit: Used io::poll instead of libev for send*.
Posted by be...@apache.org.
Used io::poll instead of libev for send*.
Updated sending_connect, send_data, and send_file.
Review: https://reviews.apache.org/r/27510
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b774ecb2
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b774ecb2
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b774ecb2
Branch: refs/heads/master
Commit: b774ecb245288c2a0a33b94e9c92ed0ee806b9c1
Parents: bc23da1
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sun Nov 2 21:21:51 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 16:26:49 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/src/encoder.hpp | 9 ++--
3rdparty/libprocess/src/process.cpp | 90 ++++++++++----------------------
2 files changed, 35 insertions(+), 64 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/b774ecb2/3rdparty/libprocess/src/encoder.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/encoder.hpp b/3rdparty/libprocess/src/encoder.hpp
index 9c5aa81..800f324 100644
--- a/3rdparty/libprocess/src/encoder.hpp
+++ b/3rdparty/libprocess/src/encoder.hpp
@@ -25,10 +25,13 @@ namespace process {
const uint32_t GZIP_MINIMUM_BODY_LENGTH = 1024;
-typedef void (*Sender)(struct ev_loop*, ev_io*, int);
+// Forward declarations.
+class Encoder;
-extern void send_data(struct ev_loop*, ev_io*, int);
-extern void send_file(struct ev_loop*, ev_io*, int);
+extern void send_data(Encoder*);
+extern void send_file(Encoder*);
+
+typedef void (*Sender)(Encoder*);
class Encoder
http://git-wip-us.apache.org/repos/asf/mesos/blob/b774ecb2/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index a33a201..ac12876 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -681,11 +681,11 @@ void ignore_data(Socket* socket, int s)
}
-void send_data(struct ev_loop* loop, ev_io* watcher, int revents)
+void send_data(Encoder* e)
{
- DataEncoder* encoder = (DataEncoder*) watcher->data;
+ DataEncoder* encoder = CHECK_NOTNULL(dynamic_cast<DataEncoder*>(e));
- int s = watcher->fd;
+ int s = encoder->socket();
while (true) {
const void* data;
@@ -703,6 +703,8 @@ void send_data(struct ev_loop* loop, ev_io* watcher, int revents)
} else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
// Might block, try again later.
encoder->backup(size);
+ io::poll(s, io::WRITE)
+ .onAny(lambda::bind(&send_data, e));
break;
} else if (length <= 0) {
// Socket error or closed.
@@ -714,8 +716,6 @@ void send_data(struct ev_loop* loop, ev_io* watcher, int revents)
}
socket_manager->close(s);
delete encoder;
- ev_io_stop(loop, watcher);
- delete watcher;
break;
} else {
CHECK(length > 0);
@@ -727,18 +727,11 @@ void send_data(struct ev_loop* loop, ev_io* watcher, int revents)
if (encoder->remaining() == 0) {
delete encoder;
- // Stop this watcher for now.
- ev_io_stop(loop, watcher);
-
// Check for more stuff to send on socket.
Encoder* next = socket_manager->next(s);
if (next != NULL) {
- watcher->data = next;
- ev_io_init(watcher, next->sender(), s, EV_WRITE);
- ev_io_start(loop, watcher);
- } else {
- // Nothing more to send right now, clean up.
- delete watcher;
+ io::poll(s, io::WRITE)
+ .onAny(lambda::bind(next->sender(), next));
}
break;
}
@@ -747,11 +740,11 @@ void send_data(struct ev_loop* loop, ev_io* watcher, int revents)
}
-void send_file(struct ev_loop* loop, ev_io* watcher, int revents)
+void send_file(Encoder* e)
{
- FileEncoder* encoder = (FileEncoder*) watcher->data;
+ FileEncoder* encoder = CHECK_NOTNULL(dynamic_cast<FileEncoder*>(e));
- int s = watcher->fd;
+ int s = encoder->socket();
while (true) {
int fd;
@@ -770,6 +763,8 @@ void send_file(struct ev_loop* loop, ev_io* watcher, int revents)
} else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
// Might block, try again later.
encoder->backup(size);
+ io::poll(s, io::WRITE)
+ .onAny(lambda::bind(&send_file, e));
break;
} else if (length <= 0) {
// Socket error or closed.
@@ -781,8 +776,6 @@ void send_file(struct ev_loop* loop, ev_io* watcher, int revents)
}
socket_manager->close(s);
delete encoder;
- ev_io_stop(loop, watcher);
- delete watcher;
break;
} else {
CHECK(length > 0);
@@ -794,18 +787,11 @@ void send_file(struct ev_loop* loop, ev_io* watcher, int revents)
if (encoder->remaining() == 0) {
delete encoder;
- // Stop this watcher for now.
- ev_io_stop(loop, watcher);
-
// Check for more stuff to send on socket.
Encoder* next = socket_manager->next(s);
if (next != NULL) {
- watcher->data = next;
- ev_io_init(watcher, next->sender(), s, EV_WRITE);
- ev_io_start(loop, watcher);
- } else {
- // Nothing more to send right now, clean up.
- delete watcher;
+ io::poll(s, io::WRITE)
+ .onAny(lambda::bind(next->sender(), next));
}
break;
}
@@ -814,9 +800,9 @@ void send_file(struct ev_loop* loop, ev_io* watcher, int revents)
}
-void sending_connect(struct ev_loop* loop, ev_io* watcher, int revents)
+void sending_connect(Encoder* encoder)
{
- int s = watcher->fd;
+ int s = encoder->socket();
// Now check that a successful connection was made.
int opt;
@@ -826,15 +812,11 @@ void sending_connect(struct ev_loop* loop, ev_io* watcher, int revents)
// Connect failure.
VLOG(1) << "Socket error while connecting";
socket_manager->close(s);
- MessageEncoder* encoder = (MessageEncoder*) watcher->data;
delete encoder;
- ev_io_stop(loop, watcher);
- delete watcher;
} else {
// We're connected! Now let's do some sending.
- ev_io_stop(loop, watcher);
- ev_io_init(watcher, send_data, s, EV_WRITE);
- ev_io_start(loop, watcher);
+ io::poll(s, io::WRITE)
+ .onAny(lambda::bind(&send_data, encoder));
}
}
@@ -1640,17 +1622,9 @@ void SocketManager::send(Encoder* encoder, bool persist)
// Initialize the outgoing queue.
outgoing[encoder->socket()];
- // Allocate and initialize the watcher.
- ev_io* watcher = new ev_io();
- watcher->data = encoder;
-
- ev_io_init(watcher, encoder->sender(), encoder->socket(), EV_WRITE);
-
- synchronized (watchers) {
- watchers->push(watcher);
- }
-
- ev_async_send(loop, &async_watcher);
+ // Start polling in order to send with this encoder.
+ io::poll(encoder->socket(), io::WRITE)
+ .onAny(lambda::bind(encoder->sender(), encoder));
}
} else {
VLOG(1) << "Attempting to send on a no longer valid socket!";
@@ -1728,9 +1702,8 @@ void SocketManager::send(Message* message)
io::poll(s, io::READ)
.onAny(lambda::bind(&ignore_data, new Socket(sockets[s]), s));
- // Allocate and initialize a watcher for sending the message.
- ev_io* watcher = new ev_io();
- watcher->data = new MessageEncoder(sockets[s], message);
+ // Create a message encoder to handle sending this message.
+ Encoder* encoder = new MessageEncoder(sockets[s], message);
// Try and connect to the node using this socket.
sockaddr_in addr;
@@ -1744,19 +1717,14 @@ void SocketManager::send(Message* message)
PLOG(FATAL) << "Failed to send, connect";
}
- // Initialize watcher for connecting.
- ev_io_init(watcher, sending_connect, s, EV_WRITE);
+ // Start polling in order to wait for being connected.
+ io::poll(s, io::WRITE)
+ .onAny(lambda::bind(&sending_connect, encoder));
} else {
- // Initialize watcher for sending.
- ev_io_init(watcher, send_data, s, EV_WRITE);
- }
-
- // Enqueue the watcher.
- synchronized (watchers) {
- watchers->push(watcher);
+ // Start polling in order to send data.
+ io::poll(s, io::WRITE)
+ .onAny(lambda::bind(&send_data, encoder));
}
-
- ev_async_send(loop, &async_watcher);
}
}
}
[27/30] mesos git commit: Introduce Socket::send() and
Socket::sendfile().
Posted by be...@apache.org.
Introduce Socket::send() and Socket::sendfile().
Also used these when linking sockets.
Review: https://reviews.apache.org/r/27964
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/71de11e9
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/71de11e9
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/71de11e9
Branch: refs/heads/master
Commit: 71de11e95f1945031e3ea1308445eddc0850c2be
Parents: 3564c44
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Nov 15 16:49:44 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 17:38:22 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/include/process/socket.hpp | 14 ++
3rdparty/libprocess/src/process.cpp | 180 +++++++++++++++++++-
2 files changed, 186 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/71de11e9/3rdparty/libprocess/include/process/socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/socket.hpp b/3rdparty/libprocess/include/process/socket.hpp
index a7c22a0..5fd8d1b 100644
--- a/3rdparty/libprocess/include/process/socket.hpp
+++ b/3rdparty/libprocess/include/process/socket.hpp
@@ -78,6 +78,10 @@ public:
Future<size_t> read(char* data, size_t size);
+ Future<size_t> send(const char* data, size_t size);
+
+ Future<size_t> sendfile(int fd, off_t offset, size_t size);
+
private:
const Impl& create() const
{
@@ -129,6 +133,16 @@ public:
return impl->read(data, size);
}
+ Future<size_t> send(const char* data, size_t size) const
+ {
+ return impl->send(data, size);
+ }
+
+ Future<size_t> sendfile(int fd, off_t offset, size_t size) const
+ {
+ return impl->sendfile(fd, offset, size);
+ }
+
private:
explicit Socket(std::shared_ptr<Impl>&& that) : impl(std::move(that)) {}
http://git-wip-us.apache.org/repos/asf/mesos/blob/71de11e9/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 63f9df4..b062b85 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -1465,6 +1465,96 @@ Future<size_t> Socket::Impl::read(char* data, size_t size)
}
+namespace internal {
+
+Future<size_t> socket_send_data(int s, const char* data, size_t size)
+{
+ CHECK(size > 0);
+
+ while (true) {
+ ssize_t length = send(s, data, size, MSG_NOSIGNAL);
+
+ if (length < 0 && (errno == EINTR)) {
+ // Interrupted, try again now.
+ continue;
+ } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
+ // Might block, try again later.
+ return io::poll(s, io::WRITE)
+ .then(lambda::bind(&internal::socket_send_data, s, data, size));
+ } else if (length <= 0) {
+ // Socket error or closed.
+ if (length < 0) {
+ const char* error = strerror(errno);
+ VLOG(1) << "Socket error while sending: " << error;
+ } else {
+ VLOG(1) << "Socket closed while sending";
+ }
+ if (length == 0) {
+ return length;
+ } else {
+ return Failure(ErrnoError("Socket send failed"));
+ }
+ } else {
+ CHECK(length > 0);
+
+ return length;
+ }
+ }
+}
+
+
+Future<size_t> socket_send_file(int s, int fd, off_t offset, size_t size)
+{
+ CHECK(size > 0);
+
+ while (true) {
+ ssize_t length = os::sendfile(s, fd, offset, size);
+
+ if (length < 0 && (errno == EINTR)) {
+ // Interrupted, try again now.
+ continue;
+ } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
+ // Might block, try again later.
+ return io::poll(s, io::WRITE)
+ .then(lambda::bind(&internal::socket_send_file, s, fd, offset, size));
+ } else if (length <= 0) {
+ // Socket error or closed.
+ if (length < 0) {
+ const char* error = strerror(errno);
+ VLOG(1) << "Socket error while sending: " << error;
+ } else {
+ VLOG(1) << "Socket closed while sending";
+ }
+ if (length == 0) {
+ return length;
+ } else {
+ return Failure(ErrnoError("Socket sendfile failed"));
+ }
+ } else {
+ CHECK(length > 0);
+
+ return length;
+ }
+ }
+}
+
+} // namespace internal {
+
+
+Future<size_t> Socket::Impl::send(const char* data, size_t size)
+{
+ return io::poll(get(), io::WRITE)
+ .then(lambda::bind(&internal::socket_send_data, get(), data, size));
+}
+
+
+Future<size_t> Socket::Impl::sendfile(int fd, off_t offset, size_t size)
+{
+ return io::poll(get(), io::WRITE)
+ .then(lambda::bind(&internal::socket_send_file, get(), fd, offset, size));
+}
+
+
SocketManager::SocketManager()
{
synchronizer(this) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
@@ -1600,27 +1690,101 @@ PID<HttpProxy> SocketManager::proxy(const Socket& socket)
}
+namespace internal {
+
+void _send(
+ const Future<size_t>& result,
+ Socket* socket,
+ Encoder* encoder,
+ size_t size);
+
+
+void send(Encoder* encoder, Socket* socket)
+{
+ switch (encoder->kind()) {
+ case Encoder::DATA: {
+ size_t size;
+ const char* data = reinterpret_cast<DataEncoder*>(encoder)->next(&size);
+ socket->send(data, size)
+ .onAny(lambda::bind(
+ &internal::_send,
+ lambda::_1,
+ socket,
+ encoder,
+ size));
+ break;
+ }
+ case Encoder::FILE: {
+ off_t offset;
+ size_t size;
+ int fd = reinterpret_cast<FileEncoder*>(encoder)->next(&offset, &size);
+ socket->sendfile(fd, offset, size)
+ .onAny(lambda::bind(
+ &internal::_send,
+ lambda::_1,
+ socket,
+ encoder,
+ size));
+ break;
+ }
+ }
+}
+
+
+void _send(
+ const Future<size_t>& length,
+ Socket* socket,
+ Encoder* encoder,
+ size_t size)
+{
+ if (length.isDiscarded() || length.isFailed()) {
+ socket_manager->close(*socket);
+ delete socket;
+ delete encoder;
+ } else {
+ // Update the encoder with the amount sent.
+ encoder->backup(size - length.get());
+
+ // See if there is any more of the message to send.
+ if (encoder->remaining() == 0) {
+ delete encoder;
+
+ // Check for more stuff to send on socket.
+ Encoder* next = socket_manager->next(*socket);
+ if (next != NULL) {
+ send(next, socket);
+ } else {
+ delete socket;
+ }
+ } else {
+ send(encoder, socket);
+ }
+ }
+}
+
+} // namespace internal {
+
+
void SocketManager::send(Encoder* encoder, bool persist)
{
CHECK(encoder != NULL);
synchronized (this) {
- if (sockets.count(encoder->socket()) > 0) {
+ Socket socket = encoder->socket();
+ if (sockets.count(socket) > 0) {
// Update whether or not this socket should get disposed after
// there is no more data to send.
if (!persist) {
- dispose.insert(encoder->socket());
+ dispose.insert(socket);
}
- if (outgoing.count(encoder->socket()) > 0) {
- outgoing[encoder->socket()].push(encoder);
+ if (outgoing.count(socket) > 0) {
+ outgoing[socket].push(encoder);
} else {
// Initialize the outgoing queue.
- outgoing[encoder->socket()];
+ outgoing[socket];
- // Start polling in order to send with this encoder.
- io::poll(encoder->socket(), io::WRITE)
- .onAny(lambda::bind(encoder->sender(), encoder));
+ internal::send(encoder, new Socket(socket));
}
} else {
VLOG(1) << "Attempting to send on a no longer valid socket!";
[06/30] mesos git commit: Simplified redundant Clock::order/update
usage.
Posted by be...@apache.org.
Simplified redundant Clock::order/update usage.
Review: https://reviews.apache.org/r/27501
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e1ef91fb
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e1ef91fb
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e1ef91fb
Branch: refs/heads/master
Commit: e1ef91fb0c711d6bf69e72e468a2c2c55684e07f
Parents: 6df6d02
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sun Nov 2 16:18:57 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 16:25:58 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/include/process/clock.hpp | 16 ++++++-
3rdparty/libprocess/src/process.cpp | 51 +++++-----------------
2 files changed, 25 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/e1ef91fb/3rdparty/libprocess/include/process/clock.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/clock.hpp b/3rdparty/libprocess/include/process/clock.hpp
index ae7d0fb..db0fb04 100644
--- a/3rdparty/libprocess/include/process/clock.hpp
+++ b/3rdparty/libprocess/include/process/clock.hpp
@@ -45,7 +45,21 @@ public:
static void advance(ProcessBase* process, const Duration& duration);
static void update(const Time& time);
- static void update(ProcessBase* process, const Time& time);
+
+ // When updating the time of a particular process you can specify
+ // whether or not you want to override the existing value even if
+ // you're going backwards in time! SAFE means don't update the
+ // previous Clock for a process if going backwards in time, where as
+ // FORCE forces this change.
+ enum Update {
+ SAFE,
+ FORCE,
+ };
+
+ static void update(
+ ProcessBase* process,
+ const Time& time,
+ Update update = SAFE);
static void order(ProcessBase* from, ProcessBase* to);
http://git-wip-us.apache.org/repos/asf/mesos/blob/e1ef91fb/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 0995a9c..2282e9b 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -663,11 +663,11 @@ void Clock::update(const Time& time)
}
-void Clock::update(ProcessBase* process, const Time& time)
+void Clock::update(ProcessBase* process, const Time& time, Update update)
{
synchronized (timeouts) {
if (clock::paused) {
- if (now(process) < time) {
+ if (now(process) < time || update == Clock::FORCE) {
VLOG(2) << "Clock of " << process->self() << " updated to " << time;
(*clock::currents)[process] = Time(time);
}
@@ -2644,15 +2644,7 @@ bool ProcessManager::deliver(
// the duration of this routine (so that we can look up it's current
// time).
if (Clock::paused()) {
- synchronized (timeouts) {
- if (Clock::paused()) {
- if (sender != NULL) {
- Clock::order(sender, receiver);
- } else {
- Clock::update(receiver, Clock::now());
- }
- }
- }
+ Clock::update(receiver, Clock::now(sender != NULL ? sender : __process__));
}
receiver->enqueue(event);
@@ -2948,15 +2940,7 @@ void ProcessManager::terminate(
{
if (ProcessReference process = use(pid)) {
if (Clock::paused()) {
- synchronized (timeouts) {
- if (Clock::paused()) {
- if (sender != NULL) {
- Clock::order(sender, process);
- } else {
- Clock::update(process, Clock::now());
- }
- }
- }
+ Clock::update(process, Clock::now(sender != NULL ? sender : __process__));
}
if (sender != NULL) {
@@ -3293,18 +3277,10 @@ ProcessBase::ProcessBase(const string& id)
pid.node = __node__;
// If using a manual clock, try and set current time of process
- // using happens before relationship between creator and createe!
+ // using happens before relationship between creator (__process__)
+ // and createe (this)!
if (Clock::paused()) {
- synchronized (timeouts) {
- if (Clock::paused()) {
- clock::currents->erase(this); // In case the address is reused!
- if (__process__ != NULL) {
- Clock::order(__process__, this);
- } else {
- Clock::update(this, Clock::now());
- }
- }
- }
+ Clock::update(this, Clock::now(__process__), Clock::FORCE);
}
}
@@ -3512,17 +3488,10 @@ UPID spawn(ProcessBase* process, bool manage)
if (process != NULL) {
// If using a manual clock, try and set current time of process
- // using happens before relationship between spawner and spawnee!
+ // using happens before relationship between spawner (__process__)
+ // and spawnee (process)!
if (Clock::paused()) {
- synchronized (timeouts) {
- if (Clock::paused()) {
- if (__process__ != NULL) {
- Clock::order(__process__, process);
- } else {
- Clock::update(process, Clock::now());
- }
- }
- }
+ Clock::update(process, Clock::now(__process__));
}
return process_manager->spawn(process, manage);
[09/30] mesos git commit: Used io::poll instead of libev for
ignore_data.
Posted by be...@apache.org.
Used io::poll instead of libev for ignore_data.
Review: https://reviews.apache.org/r/27508
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9f94d9a8
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9f94d9a8
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9f94d9a8
Branch: refs/heads/master
Commit: 9f94d9a8a42352345bffb0f1be0253952731b316
Parents: 4751167
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sun Nov 2 20:43:29 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 16:25:58 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/src/process.cpp | 54 ++++++++++++++------------------
1 file changed, 23 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/9f94d9a8/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 528cb88..bc2884b 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -644,12 +644,8 @@ void recv_data(DataDecoder* decoder, int s)
// SocketManager::send(Message) where we don't care about the data
// received we mostly just want to know when the socket has been
// closed.
-void ignore_data(struct ev_loop* loop, ev_io* watcher, int revents)
+void ignore_data(Socket* socket, int s)
{
- Socket* socket = (Socket*) watcher->data;
-
- int s = watcher->fd;
-
while (true) {
const ssize_t size = 80 * 1024;
ssize_t length = 0;
@@ -663,6 +659,8 @@ void ignore_data(struct ev_loop* loop, ev_io* watcher, int revents)
continue;
} else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
// Might block, try again later.
+ io::poll(s, io::READ)
+ .onAny(lambda::bind(&ignore_data, socket, s));
break;
} else if (length <= 0) {
// Socket error or closed.
@@ -673,9 +671,7 @@ void ignore_data(struct ev_loop* loop, ev_io* watcher, int revents)
VLOG(2) << "Socket closed while receiving";
}
socket_manager->close(s);
- ev_io_stop(loop, watcher);
delete socket;
- delete watcher;
break;
} else {
VLOG(2) << "Ignoring " << length << " bytes of data received "
@@ -861,9 +857,11 @@ void receiving_connect(struct ev_loop* loop, ev_io* watcher, int revents)
delete watcher;
} else {
// We're connected! Now let's do some receiving.
+ Socket* socket = (Socket*) watcher->data;
ev_io_stop(loop, watcher);
- ev_io_init(watcher, ignore_data, s, EV_READ);
- ev_io_start(loop, watcher);
+ delete watcher;
+ io::poll(s, io::READ)
+ .onAny(lambda::bind(&ignore_data, socket, s));
}
}
@@ -1591,17 +1589,18 @@ void SocketManager::link(ProcessBase* process, const UPID& to)
// Wait for socket to be connected.
ev_io_init(watcher, receiving_connect, s, EV_WRITE);
- } else {
- ev_io_init(watcher, ignore_data, s, EV_READ);
- }
- // Enqueue the watcher.
- synchronized (watchers) {
- watchers->push(watcher);
- }
+ // Enqueue the watcher.
+ synchronized (watchers) {
+ watchers->push(watcher);
+ }
- // Interrupt the loop.
- ev_async_send(loop, &async_watcher);
+ // Interrupt the loop.
+ ev_async_send(loop, &async_watcher);
+ } else {
+ io::poll(s, io::READ)
+ .onAny(lambda::bind(&ignore_data, new Socket(sockets[s]), s));
+ }
}
links[to].insert(process);
@@ -1741,21 +1740,14 @@ void SocketManager::send(Message* message)
// Initialize the outgoing queue.
outgoing[s];
- // Allocate and initialize a watcher for reading data from this
- // socket. Note that we don't expect to receive anything other
- // than HTTP '202 Accepted' responses which we anyway ignore.
- ev_io* watcher = new ev_io();
- watcher->data = new Socket(sockets[s]);
-
- ev_io_init(watcher, ignore_data, s, EV_READ);
-
- // Enqueue the watcher.
- synchronized (watchers) {
- watchers->push(watcher);
- }
+ // Read and ignore data from this socket. Note that we don't
+ // expect to receive anything other than HTTP '202 Accepted'
+ // responses which we just ignore.
+ io::poll(s, io::READ)
+ .onAny(lambda::bind(&ignore_data, new Socket(sockets[s]), s));
// Allocate and initialize a watcher for sending the message.
- watcher = new ev_io();
+ ev_io* watcher = new ev_io();
watcher->data = new MessageEncoder(sockets[s], message);
// Try and connect to the node using this socket.
[14/30] mesos git commit: Used io::poll instead of libev for
recv_data.
Posted by be...@apache.org.
Used io::poll instead of libev for recv_data.
Review: https://reviews.apache.org/r/27507
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/47511670
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/47511670
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/47511670
Branch: refs/heads/master
Commit: 47511670aaad85ab068902151c9c3f84573fbc99
Parents: 37bba65
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sun Nov 2 20:30:48 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 16:25:58 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/src/process.cpp | 23 ++++++-----------------
1 file changed, 6 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/47511670/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index aeaac0c..528cb88 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -589,12 +589,8 @@ void handle_async(struct ev_loop* loop, ev_async* _, int revents)
}
-void recv_data(struct ev_loop* loop, ev_io* watcher, int revents)
+void recv_data(DataDecoder* decoder, int s)
{
- DataDecoder* decoder = (DataDecoder*) watcher->data;
-
- int s = watcher->fd;
-
while (true) {
const ssize_t size = 80 * 1024;
ssize_t length = 0;
@@ -608,6 +604,8 @@ void recv_data(struct ev_loop* loop, ev_io* watcher, int revents)
continue;
} else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
// Might block, try again later.
+ io::poll(s, io::READ)
+ .onAny(lambda::bind(&recv_data, decoder, s));
break;
} else if (length <= 0) {
// Socket error or closed.
@@ -619,8 +617,6 @@ void recv_data(struct ev_loop* loop, ev_io* watcher, int revents)
}
socket_manager->close(s);
delete decoder;
- ev_io_stop(loop, watcher);
- delete watcher;
break;
} else {
CHECK(length > 0);
@@ -636,8 +632,6 @@ void recv_data(struct ev_loop* loop, ev_io* watcher, int revents)
VLOG(1) << "Decoder error while receiving";
socket_manager->close(s);
delete decoder;
- ev_io_stop(loop, watcher);
- delete watcher;
break;
}
}
@@ -913,14 +907,9 @@ void accept(struct ev_loop* loop, ev_io* watcher, int revents)
// Inform the socket manager for proper bookkeeping.
const Socket& socket = socket_manager->accepted(s);
- // Allocate and initialize the decoder and watcher.
- DataDecoder* decoder = new DataDecoder(socket);
-
- ev_io* watcher = new ev_io();
- watcher->data = decoder;
-
- ev_io_init(watcher, recv_data, s, EV_READ);
- ev_io_start(loop, watcher);
+ // Start reading from the socket.
+ io::poll(s, io::READ)
+ .onAny(lambda::bind(&recv_data, new DataDecoder(socket), s));
}
}
[17/30] mesos git commit: Fix race condition in process.cpp.
Posted by be...@apache.org.
Fix race condition in process.cpp.
Fix race condition between encoder creation and io::poll in
SocketManager::Send(Message).
Review: https://reviews.apache.org/r/27956
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/fcdae270
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/fcdae270
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/fcdae270
Branch: refs/heads/master
Commit: fcdae2704dbe6cd04c1f705497ae1113cb29e039
Parents: d1d4340
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Nov 15 16:29:26 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 16:29:27 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/src/process.cpp | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/fcdae270/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index ac12876..48e5486 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -1696,15 +1696,15 @@ void SocketManager::send(Message* message)
// Initialize the outgoing queue.
outgoing[s];
+ // Create a message encoder to handle sending this message.
+ Encoder* encoder = new MessageEncoder(sockets[s], message);
+
// Read and ignore data from this socket. Note that we don't
// expect to receive anything other than HTTP '202 Accepted'
// responses which we just ignore.
io::poll(s, io::READ)
.onAny(lambda::bind(&ignore_data, new Socket(sockets[s]), s));
- // Create a message encoder to handle sending this message.
- Encoder* encoder = new MessageEncoder(sockets[s], message);
-
// Try and connect to the node using this socket.
sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
[23/30] mesos git commit: Introduce std::enable_shared_from_this
configure check in libprocess.
Posted by be...@apache.org.
Introduce std::enable_shared_from_this configure check in libprocess.
Review: https://reviews.apache.org/r/27351
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/702b3824
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/702b3824
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/702b3824
Branch: refs/heads/master
Commit: 702b38240b9a596a4b46983da96ae9ca1c052b5d
Parents: 075859d
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Nov 15 17:36:33 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 17:38:21 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/m4/ax_cxx_compile_stdcxx_11.m4 | 13 +++++++++++++
1 file changed, 13 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/702b3824/3rdparty/libprocess/m4/ax_cxx_compile_stdcxx_11.m4
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/m4/ax_cxx_compile_stdcxx_11.m4 b/3rdparty/libprocess/m4/ax_cxx_compile_stdcxx_11.m4
index d48a96e..251213a 100644
--- a/3rdparty/libprocess/m4/ax_cxx_compile_stdcxx_11.m4
+++ b/3rdparty/libprocess/m4/ax_cxx_compile_stdcxx_11.m4
@@ -81,6 +81,19 @@ m4_define([_AX_CXX_COMPILE_STDCXX_11_testbody], [
// End scope of uniqueLock.
}
}
+
+ // Check for std::enable_shared_from_this.
+ struct SharedStruct : public std::enable_shared_from_this<SharedStruct>
+ {
+ std::shared_ptr<SharedStruct> get()
+ {
+ return shared_from_this();
+ }
+ };
+
+ // Construct a new shared_ptr using shared_from_this().
+ std::shared_ptr<SharedStruct> object =
+ std::shared_ptr<SharedStruct>(new SharedStruct())->get();
])
AC_DEFUN([AX_CXX_COMPILE_STDCXX_11], [
[24/30] mesos git commit: Remove dead code in process.cpp.
Posted by be...@apache.org.
Remove dead code in process.cpp.
Review: https://reviews.apache.org/r/27959
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/149164bb
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/149164bb
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/149164bb
Branch: refs/heads/master
Commit: 149164bb477ac5e8065e12f00124f6154a1180fe
Parents: 4d616f8
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Nov 15 16:46:15 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 17:38:21 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/src/process.cpp | 40 --------------------------------
1 file changed, 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/149164bb/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 6916cbb..73d41c9 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -800,46 +800,6 @@ void send_file(Encoder* e)
}
-void sending_connect(Encoder* encoder)
-{
- int s = encoder->socket();
-
- // Now check that a successful connection was made.
- int opt;
- socklen_t optlen = sizeof(opt);
-
- if (getsockopt(s, SOL_SOCKET, SO_ERROR, &opt, &optlen) < 0 || opt != 0) {
- // Connect failure.
- VLOG(1) << "Socket error while connecting";
- socket_manager->close(s);
- delete encoder;
- } else {
- // We're connected! Now let's do some sending.
- io::poll(s, io::WRITE)
- .onAny(lambda::bind(&send_data, encoder));
- }
-}
-
-
-void receiving_connect(Socket* socket, int s)
-{
- // Now check that a successful connection was made.
- int opt;
- socklen_t optlen = sizeof(opt);
-
- if (getsockopt(s, SOL_SOCKET, SO_ERROR, &opt, &optlen) < 0 || opt != 0) {
- // Connect failure.
- VLOG(1) << "Socket error while connecting";
- socket_manager->close(s);
- delete socket;
- } else {
- // We're connected! Now let's do some receiving.
- io::poll(s, io::READ)
- .onAny(lambda::bind(&ignore_data, socket, s));
- }
-}
-
-
void accept(struct ev_loop* loop, ev_io* watcher, int revents)
{
CHECK_EQ(__s__, watcher->fd);
[18/30] mesos git commit: Add read() to Socket interface.
Posted by be...@apache.org.
Add read() to Socket interface.
Also used it when accepting connections in libprocess.
Review: https://reviews.apache.org/r/27960
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/383f5479
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/383f5479
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/383f5479
Branch: refs/heads/master
Commit: 383f5479d71237c033acdda678571a1ad0b993b1
Parents: 149164b
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Nov 15 16:46:34 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 17:38:21 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/include/process/socket.hpp | 7 ++
3rdparty/libprocess/src/process.cpp | 133 +++++++++++---------
2 files changed, 84 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/383f5479/3rdparty/libprocess/include/process/socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/socket.hpp b/3rdparty/libprocess/include/process/socket.hpp
index fdad91f..a7c22a0 100644
--- a/3rdparty/libprocess/include/process/socket.hpp
+++ b/3rdparty/libprocess/include/process/socket.hpp
@@ -76,6 +76,8 @@ public:
Future<Socket> connect(const Node& node);
+ Future<size_t> read(char* data, size_t size);
+
private:
const Impl& create() const
{
@@ -122,6 +124,11 @@ public:
return impl->connect(node);
}
+ Future<size_t> read(char* data, size_t size) const
+ {
+ return impl->read(data, size);
+ }
+
private:
explicit Socket(std::shared_ptr<Impl>&& that) : impl(std::move(that)) {}
http://git-wip-us.apache.org/repos/asf/mesos/blob/383f5479/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 73d41c9..c37d522 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -589,56 +589,6 @@ void handle_async(struct ev_loop* loop, ev_async* _, int revents)
}
-void recv_data(DataDecoder* decoder, int s)
-{
- while (true) {
- const ssize_t size = 80 * 1024;
- ssize_t length = 0;
-
- char data[size];
-
- length = recv(s, data, size, 0);
-
- if (length < 0 && (errno == EINTR)) {
- // Interrupted, try again now.
- continue;
- } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
- // Might block, try again later.
- io::poll(s, io::READ)
- .onAny(lambda::bind(&recv_data, decoder, s));
- break;
- } else if (length <= 0) {
- // Socket error or closed.
- if (length < 0) {
- const char* error = strerror(errno);
- VLOG(1) << "Socket error while receiving: " << error;
- } else {
- VLOG(2) << "Socket closed while receiving";
- }
- socket_manager->close(s);
- delete decoder;
- break;
- } else {
- CHECK(length > 0);
-
- // Decode as much of the data as possible into HTTP requests.
- const deque<Request*>& requests = decoder->decode(data, length);
-
- if (!requests.empty()) {
- foreach (Request* request, requests) {
- process_manager->handle(decoder->socket(), request);
- }
- } else if (requests.empty() && decoder->failed()) {
- VLOG(1) << "Decoder error while receiving";
- socket_manager->close(s);
- delete decoder;
- break;
- }
- }
- }
-}
-
-
// A variant of 'recv_data' that doesn't do anything with the
// data. Used by sockets created via SocketManager::link as well as
// SocketManager::send(Message) where we don't care about the data
@@ -800,6 +750,57 @@ void send_file(Encoder* e)
}
+namespace internal {
+
+void decode_read(
+ const Future<size_t>& length,
+ char* data,
+ size_t size,
+ Socket* socket,
+ DataDecoder* decoder)
+{
+ if (length.isDiscarded() || length.isFailed()) {
+ if (length.isFailed()) {
+ VLOG(1) << "Decode failure: " << length.failure();
+ }
+
+ socket_manager->close(*socket);
+ delete[] data;
+ delete decoder;
+ delete socket;
+ return;
+ }
+
+ if (length.get() == 0) {
+ socket_manager->close(*socket);
+ delete[] data;
+ delete decoder;
+ delete socket;
+ return;
+ }
+ // Decode as much of the data as possible into HTTP requests.
+ const deque<Request*>& requests = decoder->decode(data, length.get());
+
+ if (!requests.empty()) {
+ foreach (Request* request, requests) {
+ process_manager->handle(decoder->socket(), request);
+ }
+ } else if (requests.empty() && decoder->failed()) {
+ VLOG(1) << "Decoder error while receiving";
+ socket_manager->close(*socket);
+ delete[] data;
+ delete decoder;
+ delete socket;
+ return;
+ }
+
+ socket->read(data, size)
+ .onAny(lambda::bind(&decode_read, lambda::_1, data, size, socket, decoder));
+}
+
+} // namespace internal {
+
+
void accept(struct ev_loop* loop, ev_io* watcher, int revents)
{
CHECK_EQ(__s__, watcher->fd);
@@ -837,11 +838,25 @@ void accept(struct ev_loop* loop, ev_io* watcher, int revents)
os::close(s);
} else {
// Inform the socket manager for proper bookkeeping.
- const Socket& socket = socket_manager->accepted(s);
+ Socket socket = socket_manager->accepted(s);
+
+ // Allocate a buffer to read into. This can be replaced later
+ // when socket supports a read function that provides the
+ // buffered data in the resulting callback.
+ const size_t size = 80 * 1024;
+ char* data = new char[size];
+ memset(data, 0, size);
+
+ DataDecoder* decoder = new DataDecoder(socket);
- // Start reading from the socket.
- io::poll(s, io::READ)
- .onAny(lambda::bind(&recv_data, new DataDecoder(socket), s));
+ socket.read(data, size)
+ .onAny(lambda::bind(
+ &internal::decode_read,
+ lambda::_1,
+ data,
+ size,
+ new Socket(socket),
+ decoder));
}
}
@@ -1486,6 +1501,12 @@ Future<Socket> Socket::Impl::connect(const Node& node)
}
+Future<size_t> Socket::Impl::read(char* data, size_t size)
+{
+ return io::read(get(), data, size);
+}
+
+
SocketManager::SocketManager()
{
synchronizer(this) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
@@ -1711,8 +1732,8 @@ Encoder* SocketManager::next(int s)
// We cannot assume 'sockets.count(s) > 0' here because it's
// possible that 's' has been removed with a a call to
// SocketManager::close. For example, it could be the case that a
- // socket has gone to CLOSE_WAIT and the call to 'recv' in
- // recv_data returned 0 causing SocketManager::close to get
+ // socket has gone to CLOSE_WAIT and the call to read in
+ // io::read returned 0 causing SocketManager::close to get
// invoked. Later a call to 'send' or 'sendfile' (e.g., in
// send_data or send_file) can "succeed" (because the socket is
// not "closed" yet because there are still some Socket
[04/30] mesos git commit: Used io::poll instead of libev for
receiving_connect.
Posted by be...@apache.org.
Used io::poll instead of libev for receiving_connect.
Review: https://reviews.apache.org/r/27509
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/bc23da1b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/bc23da1b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/bc23da1b
Branch: refs/heads/master
Commit: bc23da1b04aeda9d35ee2f9952fc82248b1ff313
Parents: 9f94d9a
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sun Nov 2 20:49:51 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 16:25:58 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/src/process.cpp | 36 ++++++++------------------------
1 file changed, 9 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/bc23da1b/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index bc2884b..a33a201 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -839,10 +839,8 @@ void sending_connect(struct ev_loop* loop, ev_io* watcher, int revents)
}
-void receiving_connect(struct ev_loop* loop, ev_io* watcher, int revents)
+void receiving_connect(Socket* socket, int s)
{
- int s = watcher->fd;
-
// Now check that a successful connection was made.
int opt;
socklen_t optlen = sizeof(opt);
@@ -851,15 +849,9 @@ void receiving_connect(struct ev_loop* loop, ev_io* watcher, int revents)
// Connect failure.
VLOG(1) << "Socket error while connecting";
socket_manager->close(s);
- Socket* socket = (Socket*) watcher->data;
delete socket;
- ev_io_stop(loop, watcher);
- delete watcher;
} else {
// We're connected! Now let's do some receiving.
- Socket* socket = (Socket*) watcher->data;
- ev_io_stop(loop, watcher);
- delete watcher;
io::poll(s, io::READ)
.onAny(lambda::bind(&ignore_data, socket, s));
}
@@ -1567,15 +1559,12 @@ void SocketManager::link(ProcessBase* process, const UPID& to)
persists[to.node] = s;
- // Allocate and initialize a watcher for reading data from this
- // socket. Note that we don't expect to receive anything other
- // than HTTP '202 Accepted' responses which we anyway ignore.
- // We do, however, want to react when it gets closed so we can
- // generate appropriate lost events (since this is a 'link').
- ev_io* watcher = new ev_io();
- watcher->data = new Socket(sockets[s]);
-
- // Try and connect to the node using this socket.
+ // Try and connect to the node using this socket in order to
+ // start reading data. Note that we don't expect to receive
+ // anything other than HTTP '202 Accepted' responses which we
+ // anyway ignore. We do, however, want to react when it gets
+ // closed so we can generate appropriate lost events (since this
+ // is a 'link').
sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = PF_INET;
@@ -1588,15 +1577,8 @@ void SocketManager::link(ProcessBase* process, const UPID& to)
}
// Wait for socket to be connected.
- ev_io_init(watcher, receiving_connect, s, EV_WRITE);
-
- // Enqueue the watcher.
- synchronized (watchers) {
- watchers->push(watcher);
- }
-
- // Interrupt the loop.
- ev_async_send(loop, &async_watcher);
+ io::poll(s, io::WRITE)
+ .onAny(lambda::bind(&receiving_connect, new Socket(sockets[s]), s));
} else {
io::poll(s, io::READ)
.onAny(lambda::bind(&ignore_data, new Socket(sockets[s]), s));
[22/30] mesos git commit: Use std::shared_ptr to do reference
counting in Socket.
Posted by be...@apache.org.
Use std::shared_ptr to do reference counting in Socket.
Review: https://reviews.apache.org/r/27957
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/075859d9
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/075859d9
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/075859d9
Branch: refs/heads/master
Commit: 075859d984aa0b23d8efb18cc9bf0e6a17f3e00f
Parents: 9eda433
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Nov 15 16:45:35 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 17:38:21 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/include/process/socket.hpp | 87 ++++++++++-----------
1 file changed, 42 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/075859d9/3rdparty/libprocess/include/process/socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/socket.hpp b/3rdparty/libprocess/include/process/socket.hpp
index 6683881..9f4302e 100644
--- a/3rdparty/libprocess/include/process/socket.hpp
+++ b/3rdparty/libprocess/include/process/socket.hpp
@@ -4,6 +4,7 @@
#include <assert.h>
#include <stout/abort.hpp>
+#include <stout/memory.hpp>
#include <stout/nothing.hpp>
#include <stout/os.hpp>
#include <stout/try.hpp>
@@ -40,66 +41,62 @@ inline Try<int> socket(int family, int type, int protocol) {
class Socket
{
public:
- Socket()
- : refs(new int(1)), s(-1) {}
+ class Impl
+ {
+ public:
+ Impl() : s(-1) {}
- explicit Socket(int _s)
- : refs(new int(1)), s(_s) {}
+ explicit Impl(int _s) : s(_s) {}
- ~Socket()
- {
- cleanup();
- }
+ ~Impl()
+ {
+ if (s >= 0) {
+ Try<Nothing> close = os::close(s);
+ if (close.isError()) {
+ ABORT(
+ "Failed to close socket " + stringify(s) + ": " + close.error());
+ }
+ }
+ }
- Socket(const Socket& that)
- {
- copy(that);
- }
+ int get() const
+ {
+ return s >= 0 ? s : create().get();
+ }
- Socket& operator = (const Socket& that)
- {
- if (this != &that) {
- cleanup();
- copy(that);
+ private:
+ const Impl& create() const
+ {
+ CHECK(s < 0);
+ Try<int> fd =
+ process::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
+ if (fd.isError()) {
+ ABORT("Failed to create socket: " + fd.error());
+ }
+ s = fd.get();
+ return *this;
}
- return *this;
- }
+
+ // Mutable so that the socket can be lazily created.
+ mutable int s;
+ };
+
+ Socket() : impl(std::make_shared<Impl>()) {}
+
+ explicit Socket(int s) : impl(std::make_shared<Impl>(s)) {}
bool operator == (const Socket& that) const
{
- return s == that.s && refs == that.refs;
+ return impl == that.impl;
}
operator int () const
{
- return s;
+ return impl->get();
}
private:
- void copy(const Socket& that)
- {
- assert(that.refs > 0);
- __sync_fetch_and_add(that.refs, 1);
- refs = that.refs;
- s = that.s;
- }
-
- void cleanup()
- {
- assert(refs != NULL);
- if (__sync_sub_and_fetch(refs, 1) == 0) {
- delete refs;
- if (s >= 0) {
- Try<Nothing> close = os::close(s);
- if (close.isError()) {
- ABORT("Failed to close socket: " + close.error());
- }
- }
- }
- }
-
- int* refs;
- int s;
+ memory::shared_ptr<Impl> impl;
};
} // namespace process {
[30/30] mesos git commit: Use Socket::send() for temporary
connections.
Posted by be...@apache.org.
Use Socket::send() for temporary connections.
And removed send_data and send_file now that it is not being used.
Review: https://reviews.apache.org/r/27965
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8edab655
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8edab655
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8edab655
Branch: refs/heads/master
Commit: 8edab655c990a310ef6ac66e64f116addcaf147c
Parents: 71de11e
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Nov 15 16:50:57 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 17:38:22 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/src/encoder.hpp | 17 -----
3rdparty/libprocess/src/process.cpp | 123 +------------------------------
2 files changed, 1 insertion(+), 139 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/8edab655/3rdparty/libprocess/src/encoder.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/encoder.hpp b/3rdparty/libprocess/src/encoder.hpp
index 7afde0e..2b0d83f 100644
--- a/3rdparty/libprocess/src/encoder.hpp
+++ b/3rdparty/libprocess/src/encoder.hpp
@@ -28,11 +28,6 @@ const uint32_t GZIP_MINIMUM_BODY_LENGTH = 1024;
// Forward declarations.
class Encoder;
-extern void send_data(Encoder*);
-extern void send_file(Encoder*);
-
-typedef void (*Sender)(Encoder*);
-
class Encoder
{
@@ -45,8 +40,6 @@ public:
explicit Encoder(const Socket& _s) : s(_s) {}
virtual ~Encoder() {}
- virtual Sender sender() = 0;
-
virtual Kind kind() const = 0;
virtual void backup(size_t length) = 0;
@@ -71,11 +64,6 @@ public:
virtual ~DataEncoder() {}
- virtual Sender sender()
- {
- return send_data;
- }
-
virtual Kind kind() const
{
return Encoder::DATA;
@@ -254,11 +242,6 @@ public:
os::close(fd);
}
- virtual Sender sender()
- {
- return send_file;
- }
-
virtual Kind kind() const
{
return Encoder::FILE;
http://git-wip-us.apache.org/repos/asf/mesos/blob/8edab655/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index b062b85..9f91020 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -589,125 +589,6 @@ void handle_async(struct ev_loop* loop, ev_async* _, int revents)
}
-void send_data(Encoder* e)
-{
- DataEncoder* encoder = CHECK_NOTNULL(dynamic_cast<DataEncoder*>(e));
-
- int s = encoder->socket();
-
- while (true) {
- const void* data;
- size_t size;
-
- data = encoder->next(&size);
- CHECK(size > 0);
-
- ssize_t length = send(s, data, size, MSG_NOSIGNAL);
-
- if (length < 0 && (errno == EINTR)) {
- // Interrupted, try again now.
- encoder->backup(size);
- continue;
- } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
- // Might block, try again later.
- encoder->backup(size);
- io::poll(s, io::WRITE)
- .onAny(lambda::bind(&send_data, e));
- break;
- } else if (length <= 0) {
- // Socket error or closed.
- if (length < 0) {
- const char* error = strerror(errno);
- VLOG(1) << "Socket error while sending: " << error;
- } else {
- VLOG(1) << "Socket closed while sending";
- }
- socket_manager->close(s);
- delete encoder;
- break;
- } else {
- CHECK(length > 0);
-
- // Update the encoder with the amount sent.
- encoder->backup(size - length);
-
- // See if there is any more of the message to send.
- if (encoder->remaining() == 0) {
- delete encoder;
-
- // Check for more stuff to send on socket.
- Encoder* next = socket_manager->next(s);
- if (next != NULL) {
- io::poll(s, io::WRITE)
- .onAny(lambda::bind(next->sender(), next));
- }
- break;
- }
- }
- }
-}
-
-
-void send_file(Encoder* e)
-{
- FileEncoder* encoder = CHECK_NOTNULL(dynamic_cast<FileEncoder*>(e));
-
- int s = encoder->socket();
-
- while (true) {
- int fd;
- off_t offset;
- size_t size;
-
- fd = encoder->next(&offset, &size);
- CHECK(size > 0);
-
- ssize_t length = os::sendfile(s, fd, offset, size);
-
- if (length < 0 && (errno == EINTR)) {
- // Interrupted, try again now.
- encoder->backup(size);
- continue;
- } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
- // Might block, try again later.
- encoder->backup(size);
- io::poll(s, io::WRITE)
- .onAny(lambda::bind(&send_file, e));
- break;
- } else if (length <= 0) {
- // Socket error or closed.
- if (length < 0) {
- const char* error = strerror(errno);
- VLOG(1) << "Socket error while sending: " << error;
- } else {
- VLOG(1) << "Socket closed while sending";
- }
- socket_manager->close(s);
- delete encoder;
- break;
- } else {
- CHECK(length > 0);
-
- // Update the encoder with the amount sent.
- encoder->backup(size - length);
-
- // See if there is any more of the message to send.
- if (encoder->remaining() == 0) {
- delete encoder;
-
- // Check for more stuff to send on socket.
- Encoder* next = socket_manager->next(s);
- if (next != NULL) {
- io::poll(s, io::WRITE)
- .onAny(lambda::bind(next->sender(), next));
- }
- break;
- }
- }
- }
-}
-
-
namespace internal {
void decode_read(
@@ -1841,9 +1722,7 @@ void send_connect(const Future<Socket>& socket, Message* message)
data,
size));
- // Start polling in order to send data.
- io::poll(socket.get(), io::WRITE)
- .onAny(lambda::bind(&send_data, encoder));
+ internal::send(encoder, new Socket(socket.get()));
}
} // namespace internal {
[16/30] mesos git commit: Replaced circular include dependency w/
TODO.
Posted by be...@apache.org.
Replaced circular include dependency w/ TODO.
Review: https://reviews.apache.org/r/28061
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d1d43403
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d1d43403
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d1d43403
Branch: refs/heads/master
Commit: d1d43403b49c4692ca93e545cd94d0c277719b05
Parents: b774ecb
Author: Benjamin Hindman <be...@gmail.com>
Authored: Fri Nov 14 15:56:41 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 16:27:57 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/include/process/clock.hpp | 5 ++++-
3rdparty/libprocess/include/process/timeout.hpp | 1 -
2 files changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/d1d43403/3rdparty/libprocess/include/process/clock.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/clock.hpp b/3rdparty/libprocess/include/process/clock.hpp
index db0fb04..1fd418b 100644
--- a/3rdparty/libprocess/include/process/clock.hpp
+++ b/3rdparty/libprocess/include/process/clock.hpp
@@ -4,7 +4,10 @@
#include <list>
#include <process/time.hpp>
-#include <process/timer.hpp>
+
+// TODO(benh): We should really be including <process/timer.hpp> but
+// there are currently too many circular dependencies in header files
+// that would need to get moved to translation units first.
#include <stout/duration.hpp>
#include <stout/lambda.hpp>
http://git-wip-us.apache.org/repos/asf/mesos/blob/d1d43403/3rdparty/libprocess/include/process/timeout.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/timeout.hpp b/3rdparty/libprocess/include/process/timeout.hpp
index 0bf63e1..5c46d70 100644
--- a/3rdparty/libprocess/include/process/timeout.hpp
+++ b/3rdparty/libprocess/include/process/timeout.hpp
@@ -2,7 +2,6 @@
#define __PROCESS_TIMEOUT_HPP__
#include <process/clock.hpp>
-
#include <process/time.hpp>
#include <stout/duration.hpp>
[25/30] mesos git commit: Add connect() to the Socket interface.
Posted by be...@apache.org.
Add connect() to the Socket interface.
Review: https://reviews.apache.org/r/27958
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4d616f8d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4d616f8d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4d616f8d
Branch: refs/heads/master
Commit: 4d616f8d7194be9b251aa5a3ddc51783c4d4411a
Parents: 702b382
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Nov 15 16:45:49 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 17:38:21 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/include/process/socket.hpp | 41 ++++-
3rdparty/libprocess/src/process.cpp | 193 +++++++++++---------
2 files changed, 138 insertions(+), 96 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/4d616f8d/3rdparty/libprocess/include/process/socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/socket.hpp b/3rdparty/libprocess/include/process/socket.hpp
index 9f4302e..fdad91f 100644
--- a/3rdparty/libprocess/include/process/socket.hpp
+++ b/3rdparty/libprocess/include/process/socket.hpp
@@ -3,8 +3,12 @@
#include <assert.h>
+#include <memory>
+
+#include <process/future.hpp>
+#include <process/node.hpp>
+
#include <stout/abort.hpp>
-#include <stout/memory.hpp>
#include <stout/nothing.hpp>
#include <stout/os.hpp>
#include <stout/try.hpp>
@@ -14,7 +18,8 @@ namespace process {
// Returns a socket fd for the specified options. Note that on OS X,
// the returned socket will have the SO_NOSIGPIPE option set.
-inline Try<int> socket(int family, int type, int protocol) {
+inline Try<int> socket(int family, int type, int protocol)
+{
int s;
if ((s = ::socket(family, type, protocol)) == -1) {
return ErrnoError();
@@ -41,7 +46,12 @@ inline Try<int> socket(int family, int type, int protocol) {
class Socket
{
public:
- class Impl
+ // Each socket is a reference counted, shared by default, concurrent
+ // object. However, since we want to support multiple
+ // implementations we use the Pimpl pattern (often called the
+ // compilation firewall) rather than forcing each Socket
+ // implementation to do this themselves.
+ class Impl : public std::enable_shared_from_this<Impl>
{
public:
Impl() : s(-1) {}
@@ -53,8 +63,8 @@ public:
if (s >= 0) {
Try<Nothing> close = os::close(s);
if (close.isError()) {
- ABORT(
- "Failed to close socket " + stringify(s) + ": " + close.error());
+ ABORT("Failed to close socket " +
+ stringify(s) + ": " + close.error());
}
}
}
@@ -64,6 +74,8 @@ public:
return s >= 0 ? s : create().get();
}
+ Future<Socket> connect(const Node& node);
+
private:
const Impl& create() const
{
@@ -78,6 +90,11 @@ public:
}
// Mutable so that the socket can be lazily created.
+ //
+ // TODO(benh): Create a factory for sockets and don't lazily
+ // create but instead return a Try<Socket> from the factory in
+ // order to eliminate the need for a mutable member or the call to
+ // ABORT above.
mutable int s;
};
@@ -95,8 +112,20 @@ public:
return impl->get();
}
+ int get() const
+ {
+ return impl->get();
+ }
+
+ Future<Socket> connect(const Node& node)
+ {
+ return impl->connect(node);
+ }
+
private:
- memory::shared_ptr<Impl> impl;
+ explicit Socket(std::shared_ptr<Impl>&& that) : impl(std::move(that)) {}
+
+ std::shared_ptr<Impl> impl;
};
} // namespace process {
http://git-wip-us.apache.org/repos/asf/mesos/blob/4d616f8d/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 48e5486..6916cbb 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -1484,6 +1484,48 @@ void HttpProxy::stream(const Future<short>& poll, const Request& request)
}
+namespace internal {
+
+Future<Socket> connect(const Socket& socket)
+{
+ // Now check that a successful connection was made.
+ int opt;
+ socklen_t optlen = sizeof(opt);
+ int s = socket.get();
+
+ if (getsockopt(s, SOL_SOCKET, SO_ERROR, &opt, &optlen) < 0 || opt != 0) {
+ // Connect failure.
+ VLOG(1) << "Socket error while connecting";
+ return Failure("Socket error while connecting");
+ }
+
+ return socket;
+}
+
+} // namespace internal {
+
+
+Future<Socket> Socket::Impl::connect(const Node& node)
+{
+ sockaddr_in addr;
+ memset(&addr, 0, sizeof(addr));
+ addr.sin_family = PF_INET;
+ addr.sin_port = htons(node.port);
+ addr.sin_addr.s_addr = node.ip;
+
+ if (::connect(get(), (sockaddr*) &addr, sizeof(addr)) < 0) {
+ if (errno != EINPROGRESS) {
+ return Failure(ErrnoError("Failed to connect socket"));
+ }
+
+ return io::poll(get(), io::WRITE)
+ .then(lambda::bind(&internal::connect, Socket(shared_from_this())));
+ }
+
+ return Socket(shared_from_this());
+}
+
+
SocketManager::SocketManager()
{
synchronizer(this) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
@@ -1503,6 +1545,25 @@ Socket SocketManager::accepted(int s)
}
+namespace internal {
+
+void link_connect(const Future<Socket>& socket)
+{
+ if (socket.isDiscarded() || socket.isFailed()) {
+ if (socket.isFailed()) {
+ VLOG(1) << "Failed to link, connect: " << socket.failure();
+ }
+ socket_manager->close(socket.get());
+ return;
+ }
+
+ io::poll(socket.get(), io::READ)
+ .onAny(lambda::bind(&ignore_data, new Socket(socket.get()), socket.get()));
+}
+
+} // namespace internal {
+
+
void SocketManager::link(ProcessBase* process, const UPID& to)
{
// TODO(benh): The semantics we want to support for link are such
@@ -1516,58 +1577,22 @@ void SocketManager::link(ProcessBase* process, const UPID& to)
CHECK(process != NULL);
synchronized (this) {
+ links[to].insert(process);
+
// Check if node is remote and there isn't a persistant link.
if (to.node != __node__ && persists.count(to.node) == 0) {
// Okay, no link, let's create a socket.
- Try<int> socket = process::socket(AF_INET, SOCK_STREAM, 0);
- if (socket.isError()) {
- LOG(FATAL) << "Failed to link, socket: " << socket.error();
- }
-
- int s = socket.get();
-
- Try<Nothing> nonblock = os::nonblock(s);
- if (nonblock.isError()) {
- LOG(FATAL) << "Failed to link, nonblock: " << nonblock.error();
- }
-
- Try<Nothing> cloexec = os::cloexec(s);
- if (cloexec.isError()) {
- LOG(FATAL) << "Failed to link, cloexec: " << cloexec.error();
- }
+ Socket socket;
+ int s = socket;
- sockets[s] = Socket(s);
+ sockets[s] = socket;
nodes[s] = to.node;
persists[to.node] = s;
- // Try and connect to the node using this socket in order to
- // start reading data. Note that we don't expect to receive
- // anything other than HTTP '202 Accepted' responses which we
- // anyway ignore. We do, however, want to react when it gets
- // closed so we can generate appropriate lost events (since this
- // is a 'link').
- sockaddr_in addr;
- memset(&addr, 0, sizeof(addr));
- addr.sin_family = PF_INET;
- addr.sin_port = htons(to.node.port);
- addr.sin_addr.s_addr = to.node.ip;
-
- if (connect(s, (sockaddr*) &addr, sizeof(addr)) < 0) {
- if (errno != EINPROGRESS) {
- PLOG(FATAL) << "Failed to link, connect";
- }
-
- // Wait for socket to be connected.
- io::poll(s, io::WRITE)
- .onAny(lambda::bind(&receiving_connect, new Socket(sockets[s]), s));
- } else {
- io::poll(s, io::READ)
- .onAny(lambda::bind(&ignore_data, new Socket(sockets[s]), s));
- }
+ socket.connect(to.node)
+ .onAny(lambda::bind(&internal::link_connect, lambda::_1));
}
-
- links[to].insert(process);
}
}
@@ -1653,11 +1678,40 @@ void SocketManager::send(
}
+namespace internal {
+
+void send_connect(const Future<Socket>& socket, Message* message)
+{
+ if (socket.isDiscarded() || socket.isFailed()) {
+ if (socket.isFailed()) {
+ VLOG(1) << "Failed to send, connect: " << socket.failure();
+ }
+ socket_manager->close(socket.get());
+ delete message;
+ return;
+ }
+
+ Encoder* encoder = new MessageEncoder(socket.get(), message);
+
+ // Read and ignore data from this socket. Note that we don't
+ // expect to receive anything other than HTTP '202 Accepted'
+ // responses which we just ignore.
+ io::poll(socket.get(), io::READ)
+ .onAny(lambda::bind(&ignore_data, new Socket(socket.get()), socket.get()));
+
+ // Start polling in order to send data.
+ io::poll(socket.get(), io::WRITE)
+ .onAny(lambda::bind(&send_data, encoder));
+}
+
+} // namespace internal {
+
+
void SocketManager::send(Message* message)
{
CHECK(message != NULL);
- Node node(message->to.node);
+ const Node& node = message->to.node;
synchronized (this) {
// Check if there is already a socket.
@@ -1670,24 +1724,10 @@ void SocketManager::send(Message* message)
} else {
// No peristent or temporary socket to the node currently
// exists, so we create a temporary one.
- Try<int> socket = process::socket(AF_INET, SOCK_STREAM, 0);
- if (socket.isError()) {
- LOG(FATAL) << "Failed to send, socket: " << socket.error();
- }
-
- int s = socket.get();
+ Socket socket;
+ int s = socket;
- Try<Nothing> nonblock = os::nonblock(s);
- if (nonblock.isError()) {
- LOG(FATAL) << "Failed to send, nonblock: " << nonblock.error();
- }
-
- Try<Nothing> cloexec = os::cloexec(s);
- if (cloexec.isError()) {
- LOG(FATAL) << "Failed to send, cloexec: " << cloexec.error();
- }
-
- sockets[s] = Socket(s);
+ sockets[s] = socket;
nodes[s] = node;
temps[node] = s;
@@ -1696,35 +1736,8 @@ void SocketManager::send(Message* message)
// Initialize the outgoing queue.
outgoing[s];
- // Create a message encoder to handle sending this message.
- Encoder* encoder = new MessageEncoder(sockets[s], message);
-
- // Read and ignore data from this socket. Note that we don't
- // expect to receive anything other than HTTP '202 Accepted'
- // responses which we just ignore.
- io::poll(s, io::READ)
- .onAny(lambda::bind(&ignore_data, new Socket(sockets[s]), s));
-
- // Try and connect to the node using this socket.
- sockaddr_in addr;
- memset(&addr, 0, sizeof(addr));
- addr.sin_family = PF_INET;
- addr.sin_port = htons(node.port);
- addr.sin_addr.s_addr = node.ip;
-
- if (connect(s, (sockaddr*) &addr, sizeof(addr)) < 0) {
- if (errno != EINPROGRESS) {
- PLOG(FATAL) << "Failed to send, connect";
- }
-
- // Start polling in order to wait for being connected.
- io::poll(s, io::WRITE)
- .onAny(lambda::bind(&sending_connect, encoder));
- } else {
- // Start polling in order to send data.
- io::poll(s, io::WRITE)
- .onAny(lambda::bind(&send_data, encoder));
- }
+ socket.connect(node)
+ .onAny(lambda::bind(&internal::send_connect, lambda::_1, message));
}
}
}
[12/30] mesos git commit: Moved Clock implementation into clock.cpp.
Posted by be...@apache.org.
Moved Clock implementation into clock.cpp.
Review: https://reviews.apache.org/r/27503
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0e19796d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0e19796d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0e19796d
Branch: refs/heads/master
Commit: 0e19796dec8db07e2a60df15b88970dc387fbe21
Parents: 18cc45f
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sun Nov 2 17:04:25 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 16:25:58 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/Makefile.am | 1 +
3rdparty/libprocess/src/clock.cpp | 437 +++++++++++++++++++++++++++++++
3rdparty/libprocess/src/process.cpp | 410 +----------------------------
3 files changed, 444 insertions(+), 404 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/0e19796d/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index a55d562..0008e68 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -30,6 +30,7 @@ PICOJSON = 3rdparty/picojson-$(PICOJSON_VERSION)
noinst_LTLIBRARIES = libprocess.la
libprocess_la_SOURCES = \
+ src/clock.cpp \
src/config.hpp \
src/decoder.hpp \
src/encoder.hpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/0e19796d/3rdparty/libprocess/src/clock.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/clock.cpp b/3rdparty/libprocess/src/clock.cpp
new file mode 100644
index 0000000..5623b58
--- /dev/null
+++ b/3rdparty/libprocess/src/clock.cpp
@@ -0,0 +1,437 @@
+#include <ev.h>
+
+#include <glog/logging.h>
+
+#include <list>
+#include <map>
+
+#include <process/clock.hpp>
+#include <process/pid.hpp>
+#include <process/process.hpp>
+#include <process/time.hpp>
+#include <process/timeout.hpp>
+
+#include <stout/duration.hpp>
+#include <stout/foreach.hpp>
+#include <stout/lambda.hpp>
+#include <stout/try.hpp>
+
+#include "synchronized.hpp"
+
+using std::list;
+using std::map;
+
+namespace process {
+
+// Event loop.
+extern struct ev_loop* loop;
+
+// Asynchronous watcher for interrupting loop to specifically deal
+// with updating timers.
+static ev_async async_update_timer_watcher;
+
+// Watcher for timeouts.
+static ev_timer timeouts_watcher;
+
+// We store the timers in a map of lists indexed by the timeout of the
+// timer so that we can have two timers that have the same timeout. We
+// exploit that the map is SORTED!
+static map<Time, list<Timer>>* timeouts = new map<Time, list<Timer>>();
+static synchronizable(timeouts) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
+
+// Flag to indicate whether or to update the timer on async interrupt.
+static bool update_timer = false;
+
+
+// We namespace the clock related variables to keep them well
+// named. In the future we'll probably want to associate a clock with
+// a specific ProcessManager/SocketManager instance pair, so this will
+// likely change.
+namespace clock {
+
+map<ProcessBase*, Time>* currents = new map<ProcessBase*, Time>();
+
+// TODO(dhamon): These static non-POD instances should be replaced by pointers
+// or functions.
+Time initial = Time::epoch();
+Time current = Time::epoch();
+
+Duration advanced = Duration::zero();
+
+bool paused = false;
+
+// For supporting Clock::settled(), false if we're not currently
+// settling (or we're not paused), true if we're currently attempting
+// to settle (and we're paused).
+bool settling = false;
+
+// Lambda function to invoke when timers have expired.
+lambda::function<void(list<Timer>&&)> callback;
+
+} // namespace clock {
+
+
+void handle_async_update_timer(struct ev_loop* loop, ev_async* _, int revents)
+{
+ synchronized (timeouts) {
+ if (update_timer) {
+ if (!timeouts->empty()) {
+ // Determine when the next timer should fire.
+ timeouts_watcher.repeat =
+ (timeouts->begin()->first - Clock::now()).secs();
+
+ if (timeouts_watcher.repeat <= 0) {
+ // Feed the event now!
+ timeouts_watcher.repeat = 0;
+ ev_timer_again(loop, &timeouts_watcher);
+ ev_feed_event(loop, &timeouts_watcher, EV_TIMEOUT);
+ } else {
+ // Don't fire the timer if the clock is paused since we
+ // don't want time to advance (instead a call to
+ // clock::advance() will handle the timer).
+ if (Clock::paused() && timeouts_watcher.repeat > 0) {
+ timeouts_watcher.repeat = 0;
+ }
+
+ ev_timer_again(loop, &timeouts_watcher);
+ }
+ }
+
+ update_timer = false;
+ }
+ }
+}
+
+
+void handle_timeouts(struct ev_loop* loop, ev_timer* _, int revents)
+{
+ list<Timer> timers;
+
+ synchronized (timeouts) {
+ Time now = Clock::now();
+
+ VLOG(3) << "Handling timeouts up to " << now;
+
+ foreachkey (const Time& timeout, *timeouts) {
+ if (timeout > now) {
+ break;
+ }
+
+ VLOG(3) << "Have timeout(s) at " << timeout;
+
+ // Need to toggle 'settling' so that we don't prematurely say
+ // we're settled until after the timers are executed below,
+ // outside of the critical section.
+ if (clock::paused) {
+ clock::settling = true;
+ }
+
+ foreach (const Timer& timer, (*timeouts)[timeout]) {
+ timers.push_back(timer);
+ }
+ }
+
+ // Now erase the range of timeouts that timed out.
+ timeouts->erase(timeouts->begin(), timeouts->upper_bound(now));
+
+ // Okay, so the timeout for the next timer should not have fired.
+ CHECK(timeouts->empty() || (timeouts->begin()->first > now));
+
+ // Update the timer as necessary.
+ if (!timeouts->empty()) {
+ // Determine when the next timer should fire.
+ timeouts_watcher.repeat =
+ (timeouts->begin()->first - Clock::now()).secs();
+
+ if (timeouts_watcher.repeat <= 0) {
+ // Feed the event now!
+ timeouts_watcher.repeat = 0;
+ ev_timer_again(loop, &timeouts_watcher);
+ ev_feed_event(loop, &timeouts_watcher, EV_TIMEOUT);
+ } else {
+ // Don't fire the timer if the clock is paused since we don't
+ // want time to advance (instead a call to Clock::advance()
+ // will handle the timer).
+ if (Clock::paused() && timeouts_watcher.repeat > 0) {
+ timeouts_watcher.repeat = 0;
+ }
+
+ ev_timer_again(loop, &timeouts_watcher);
+ }
+ }
+
+ update_timer = false; // Since we might have a queued update_timer.
+ }
+
+ clock::callback(std::move(timers));
+
+ // Mark 'settling' as false since there are not any more timeouts
+ // that will expire before the paused time and we've finished
+ // executing expired timers.
+ synchronized (timeouts) {
+ if (clock::paused &&
+ (timeouts->size() == 0 ||
+ timeouts->begin()->first > clock::current)) {
+ VLOG(3) << "Clock has settled";
+ clock::settling = false;
+ }
+ }
+}
+
+
+void Clock::initialize(lambda::function<void(list<Timer>&&)>&& callback)
+{
+ // TODO(benh): Currently this function is expected to get called
+ // just after initializing libev in process::initialize. But that is
+ // too tightly coupled so and we really need to move libev specific
+ // intialization outside of process::initialize that both
+ // process::initialize and Clock::initialize can depend on (and thus
+ // call).
+
+ clock::callback = callback;
+
+ ev_async_init(&async_update_timer_watcher, handle_async_update_timer);
+ ev_async_start(loop, &async_update_timer_watcher);
+
+ ev_timer_init(&timeouts_watcher, handle_timeouts, 0., 2100000.0);
+ ev_timer_again(loop, &timeouts_watcher);
+}
+
+
+Time Clock::now()
+{
+ return now(__process__);
+}
+
+
+Time Clock::now(ProcessBase* process)
+{
+ synchronized (timeouts) {
+ if (Clock::paused()) {
+ if (process != NULL) {
+ if (clock::currents->count(process) != 0) {
+ return (*clock::currents)[process];
+ } else {
+ return (*clock::currents)[process] = clock::initial;
+ }
+ } else {
+ return clock::current;
+ }
+ }
+ }
+
+ // TODO(benh): Versus ev_now()?
+ double d = ev_time();
+ Try<Time> time = Time::create(d); // Compensates for clock::advanced.
+
+ // TODO(xujyan): Move CHECK_SOME to libprocess and add CHECK_SOME
+ // here.
+ if (time.isError()) {
+ LOG(FATAL) << "Failed to create a Time from " << d << ": "
+ << time.error();
+ }
+ return time.get();
+}
+
+
+Timer Clock::timer(
+ const Duration& duration,
+ const lambda::function<void(void)>& thunk)
+{
+ static uint64_t id = 1; // Start at 1 since Timer() instances use id 0.
+
+ // Assumes Clock::now() does Clock::now(__process__).
+ Timeout timeout = Timeout::in(duration);
+
+ UPID pid = __process__ != NULL ? __process__->self() : UPID();
+
+ Timer timer(__sync_fetch_and_add(&id, 1), timeout, pid, thunk);
+
+ VLOG(3) << "Created a timer for " << pid << " in " << stringify(duration)
+ << " in the future (" << timeout.time() << ")";
+
+ // Add the timer.
+ synchronized (timeouts) {
+ if (timeouts->size() == 0 ||
+ timer.timeout().time() < timeouts->begin()->first) {
+ // Need to interrupt the loop to update/set timer repeat.
+ (*timeouts)[timer.timeout().time()].push_back(timer);
+ update_timer = true;
+ ev_async_send(loop, &async_update_timer_watcher);
+ } else {
+ // Timer repeat is adequate, just add the timeout.
+ CHECK(timeouts->size() >= 1);
+ (*timeouts)[timer.timeout().time()].push_back(timer);
+ }
+ }
+
+ return timer;
+}
+
+
+bool Clock::cancel(const Timer& timer)
+{
+ bool canceled = false;
+ synchronized (timeouts) {
+ // Check if the timeout is still pending, and if so, erase it. In
+ // addition, erase an empty list if we just removed the last
+ // timeout.
+ Time time = timer.timeout().time();
+ if (timeouts->count(time) > 0) {
+ canceled = true;
+ (*timeouts)[time].remove(timer);
+ if ((*timeouts)[time].empty()) {
+ timeouts->erase(time);
+ }
+ }
+ }
+
+ return canceled;
+}
+
+
+void Clock::pause()
+{
+ process::initialize(); // To make sure the libev watchers are ready.
+
+ synchronized (timeouts) {
+ if (!clock::paused) {
+ clock::initial = clock::current = now();
+ clock::paused = true;
+ VLOG(2) << "Clock paused at " << clock::initial;
+ }
+ }
+
+ // Note that after pausing the clock an existing libev timer might
+ // still fire (invoking handle_timeout), but since paused == true no
+ // "time" will actually have passed, so no timer will actually fire.
+}
+
+
+bool Clock::paused()
+{
+ return clock::paused;
+}
+
+
+void Clock::resume()
+{
+ process::initialize(); // To make sure the libev watchers are ready.
+
+ synchronized (timeouts) {
+ if (clock::paused) {
+ VLOG(2) << "Clock resumed at " << clock::current;
+ clock::paused = false;
+ clock::settling = false;
+ clock::currents->clear();
+ update_timer = true;
+ ev_async_send(loop, &async_update_timer_watcher);
+ }
+ }
+}
+
+
+void Clock::advance(const Duration& duration)
+{
+ synchronized (timeouts) {
+ if (clock::paused) {
+ clock::advanced += duration;
+ clock::current += duration;
+ VLOG(2) << "Clock advanced (" << duration << ") to " << clock::current;
+ if (!update_timer) {
+ update_timer = true;
+ ev_async_send(loop, &async_update_timer_watcher);
+ }
+ }
+ }
+}
+
+
+void Clock::advance(ProcessBase* process, const Duration& duration)
+{
+ synchronized (timeouts) {
+ if (clock::paused) {
+ Time current = now(process);
+ current += duration;
+ (*clock::currents)[process] = current;
+ VLOG(2) << "Clock of " << process->self() << " advanced (" << duration
+ << ") to " << current;
+ }
+ }
+}
+
+
+void Clock::update(const Time& time)
+{
+ synchronized (timeouts) {
+ if (clock::paused) {
+ if (clock::current < time) {
+ clock::advanced += (time - clock::current);
+ clock::current = Time(time);
+ VLOG(2) << "Clock updated to " << clock::current;
+ if (!update_timer) {
+ update_timer = true;
+ ev_async_send(loop, &async_update_timer_watcher);
+ }
+ }
+ }
+ }
+}
+
+
+void Clock::update(ProcessBase* process, const Time& time, Update update)
+{
+ synchronized (timeouts) {
+ if (clock::paused) {
+ if (now(process) < time || update == Clock::FORCE) {
+ VLOG(2) << "Clock of " << process->self() << " updated to " << time;
+ (*clock::currents)[process] = Time(time);
+ }
+ }
+ }
+}
+
+
+void Clock::order(ProcessBase* from, ProcessBase* to)
+{
+ VLOG(2) << "Clock of " << to->self() << " being updated to " << from->self();
+ update(to, now(from));
+}
+
+
+bool Clock::settled()
+{
+ synchronized (timeouts) {
+ CHECK(clock::paused);
+
+ if (update_timer) {
+ return false;
+ } else if (clock::settling) {
+ VLOG(3) << "Clock still not settled";
+ return false;
+ } else if (timeouts->size() == 0 ||
+ timeouts->begin()->first > clock::current) {
+ VLOG(3) << "Clock is settled";
+ return true;
+ }
+
+ VLOG(3) << "Clock is not settled";
+ return false;
+ }
+}
+
+
+// TODO(benh): Introduce a Clock::time(seconds) that replaces this
+// function for getting a 'Time' value.
+Try<Time> Time::create(double seconds)
+{
+ Try<Duration> duration = Duration::create(seconds);
+ if (duration.isSome()) {
+ // In production code, clock::advanced will always be zero!
+ return Time(duration.get() + clock::advanced);
+ } else {
+ return Error("Argument too large for Time: " + duration.error());
+ }
+}
+
+} // namespace process {
http://git-wip-us.apache.org/repos/asf/mesos/blob/0e19796d/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index a0b4ca0..bac4200 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -447,19 +447,12 @@ static SocketManager* socket_manager = NULL;
static ProcessManager* process_manager = NULL;
// Event loop.
-static struct ev_loop* loop = NULL;
+struct ev_loop* loop = NULL;
// Asynchronous watcher for interrupting loop to specifically deal
// with IO watchers and functions (via run_in_event_loop).
static ev_async async_watcher;
-// Asynchronous watcher for interrupting loop to specifically deal
-// with updating timers.
-static ev_async async_update_timer_watcher;
-
-// Watcher for timeouts.
-static ev_timer timeouts_watcher;
-
// Server watcher for accepting connections.
static ev_io server_watcher;
@@ -475,15 +468,6 @@ static synchronizable(watchers) = SYNCHRONIZED_INITIALIZER;
static queue<lambda::function<void(void)> >* functions =
new queue<lambda::function<void(void)> >();
-// We store the timers in a map of lists indexed by the timeout of the
-// timer so that we can have two timers that have the same timeout. We
-// exploit that the map is SORTED!
-static map<Time, list<Timer> >* timeouts = new map<Time, list<Timer> >();
-static synchronizable(timeouts) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
-
-// Flag to indicate whether or to update the timer on async interrupt.
-static bool update_timer = false;
-
// Scheduling gate that threads wait at when there is nothing to run.
static Gate* gate = new Gate();
@@ -509,227 +493,15 @@ ThreadLocal<Executor>* _executor_ = new ThreadLocal<Executor>();
// const Duration LIBPROCESS_STATISTICS_WINDOW = Days(1);
-// We namespace the clock related variables to keep them well
-// named. In the future we'll probably want to associate a clock with
-// a specific ProcessManager/SocketManager instance pair, so this will
-// likely change.
-namespace clock {
-
-map<ProcessBase*, Time>* currents = new map<ProcessBase*, Time>();
-
-// TODO(dhamon): These static non-POD instances should be replaced by pointers
-// or functions.
-Time initial = Time::epoch();
-Time current = Time::epoch();
-
-Duration advanced = Duration::zero();
-
-bool paused = false;
-
-// For supporting Clock::settled(), false if we're not currently
-// settling (or we're not paused), true if we're currently attempting
-// to settle (and we're paused).
-bool settling = false;
-
-// Lambda function to invoke when timers have expired.
-lambda::function<void(list<Timer>&&)> callback;
-
-} // namespace clock {
-
-
-void Clock::initialize(lambda::function<void(list<Timer>&&)>&& callback)
-{
- clock::callback = callback;
-}
-
-
-Time Clock::now()
-{
- return now(__process__);
-}
-
-
-Time Clock::now(ProcessBase* process)
-{
- synchronized (timeouts) {
- if (Clock::paused()) {
- if (process != NULL) {
- if (clock::currents->count(process) != 0) {
- return (*clock::currents)[process];
- } else {
- return (*clock::currents)[process] = clock::initial;
- }
- } else {
- return clock::current;
- }
- }
- }
-
- // TODO(benh): Versus ev_now()?
- double d = ev_time();
- Try<Time> time = Time::create(d); // Compensates for clock::advanced.
-
- // TODO(xujyan): Move CHECK_SOME to libprocess and add CHECK_SOME
- // here.
- if (time.isError()) {
- LOG(FATAL) << "Failed to create a Time from " << d << ": "
- << time.error();
- }
- return time.get();
-}
-
-
-void Clock::pause()
-{
- process::initialize(); // To make sure the libev watchers are ready.
-
- synchronized (timeouts) {
- if (!clock::paused) {
- clock::initial = clock::current = now();
- clock::paused = true;
- VLOG(2) << "Clock paused at " << clock::initial;
- }
- }
-
- // Note that after pausing the clock an existing libev timer might
- // still fire (invoking handle_timeout), but since paused == true no
- // "time" will actually have passed, so no timer will actually fire.
-}
-
-
-bool Clock::paused()
-{
- return clock::paused;
-}
-
-
-void Clock::resume()
-{
- process::initialize(); // To make sure the libev watchers are ready.
-
- synchronized (timeouts) {
- if (clock::paused) {
- VLOG(2) << "Clock resumed at " << clock::current;
- clock::paused = false;
- clock::settling = false;
- clock::currents->clear();
- update_timer = true;
- ev_async_send(loop, &async_update_timer_watcher);
- }
- }
-}
-
-
-void Clock::advance(const Duration& duration)
-{
- synchronized (timeouts) {
- if (clock::paused) {
- clock::advanced += duration;
- clock::current += duration;
- VLOG(2) << "Clock advanced (" << duration << ") to " << clock::current;
- if (!update_timer) {
- update_timer = true;
- ev_async_send(loop, &async_update_timer_watcher);
- }
- }
- }
-}
-
-
-void Clock::advance(ProcessBase* process, const Duration& duration)
-{
- synchronized (timeouts) {
- if (clock::paused) {
- Time current = now(process);
- current += duration;
- (*clock::currents)[process] = current;
- VLOG(2) << "Clock of " << process->self() << " advanced (" << duration
- << ") to " << current;
- }
- }
-}
-
-
-void Clock::update(const Time& time)
-{
- synchronized (timeouts) {
- if (clock::paused) {
- if (clock::current < time) {
- clock::advanced += (time - clock::current);
- clock::current = Time(time);
- VLOG(2) << "Clock updated to " << clock::current;
- if (!update_timer) {
- update_timer = true;
- ev_async_send(loop, &async_update_timer_watcher);
- }
- }
- }
- }
-}
-
-
-void Clock::update(ProcessBase* process, const Time& time, Update update)
-{
- synchronized (timeouts) {
- if (clock::paused) {
- if (now(process) < time || update == Clock::FORCE) {
- VLOG(2) << "Clock of " << process->self() << " updated to " << time;
- (*clock::currents)[process] = Time(time);
- }
- }
- }
-}
-
-
-void Clock::order(ProcessBase* from, ProcessBase* to)
-{
- VLOG(2) << "Clock of " << to->self() << " being updated to " << from->self();
- update(to, now(from));
-}
-
-
+// NOTE: Clock::* implementations are in clock.cpp except for
+// Clock::settle which currently has a dependency on
+// 'process_manager'.
void Clock::settle()
{
- CHECK(clock::paused); // TODO(benh): Consider returning a bool instead.
process_manager->settle();
}
-bool Clock::settled()
-{
- synchronized (timeouts) {
- CHECK(clock::paused);
-
- if (update_timer) {
- return false;
- } else if (clock::settling) {
- VLOG(3) << "Clock still not settled";
- return false;
- } else if (timeouts->size() == 0 ||
- timeouts->begin()->first > clock::current) {
- VLOG(3) << "Clock is settled";
- return true;
- }
-
- VLOG(3) << "Clock is not settled";
-
- return false;
- }
-}
-
-
-Try<Time> Time::create(double seconds)
-{
- Try<Duration> duration = Duration::create(seconds);
- if (duration.isSome()) {
- // In production code, clock::advanced will always be zero!
- return Time(duration.get() + clock::advanced);
- } else {
- return Error("Argument too large for Time: " + duration.error());
- }
-}
-
-
static Message* encode(const UPID& from,
const UPID& to,
const string& name,
@@ -874,114 +646,6 @@ void handle_async(struct ev_loop* loop, ev_async* _, int revents)
}
-void handle_async_update_timer(struct ev_loop* loop, ev_async* _, int revents)
-{
- synchronized (timeouts) {
- if (update_timer) {
- if (!timeouts->empty()) {
- // Determine when the next timer should fire.
- timeouts_watcher.repeat =
- (timeouts->begin()->first - Clock::now()).secs();
-
- if (timeouts_watcher.repeat <= 0) {
- // Feed the event now!
- timeouts_watcher.repeat = 0;
- ev_timer_again(loop, &timeouts_watcher);
- ev_feed_event(loop, &timeouts_watcher, EV_TIMEOUT);
- } else {
- // Don't fire the timer if the clock is paused since we
- // don't want time to advance (instead a call to
- // clock::advance() will handle the timer).
- if (Clock::paused() && timeouts_watcher.repeat > 0) {
- timeouts_watcher.repeat = 0;
- }
-
- ev_timer_again(loop, &timeouts_watcher);
- }
- }
-
- update_timer = false;
- }
- }
-}
-
-
-void handle_timeouts(struct ev_loop* loop, ev_timer* _, int revents)
-{
- list<Timer> timers;
-
- synchronized (timeouts) {
- Time now = Clock::now();
-
- VLOG(3) << "Handling timeouts up to " << now;
-
- foreachkey (const Time& timeout, *timeouts) {
- if (timeout > now) {
- break;
- }
-
- VLOG(3) << "Have timeout(s) at " << timeout;
-
- // Need to toggle 'settling' so that we don't prematurely say
- // we're settled until after the timers are executed below,
- // outside of the critical section.
- if (clock::paused) {
- clock::settling = true;
- }
-
- foreach (const Timer& timer, (*timeouts)[timeout]) {
- timers.push_back(timer);
- }
- }
-
- // Now erase the range of timeouts that timed out.
- timeouts->erase(timeouts->begin(), timeouts->upper_bound(now));
-
- // Okay, so the timeout for the next timer should not have fired.
- CHECK(timeouts->empty() || (timeouts->begin()->first > now));
-
- // Update the timer as necessary.
- if (!timeouts->empty()) {
- // Determine when the next timer should fire.
- timeouts_watcher.repeat =
- (timeouts->begin()->first - Clock::now()).secs();
-
- if (timeouts_watcher.repeat <= 0) {
- // Feed the event now!
- timeouts_watcher.repeat = 0;
- ev_timer_again(loop, &timeouts_watcher);
- ev_feed_event(loop, &timeouts_watcher, EV_TIMEOUT);
- } else {
- // Don't fire the timer if the clock is paused since we don't
- // want time to advance (instead a call to Clock::advance()
- // will handle the timer).
- if (Clock::paused() && timeouts_watcher.repeat > 0) {
- timeouts_watcher.repeat = 0;
- }
-
- ev_timer_again(loop, &timeouts_watcher);
- }
- }
-
- update_timer = false; // Since we might have a queued update_timer.
- }
-
- clock::callback(std::move(timers));
-
- // Mark 'settling' as false since there are not any more timeouts
- // that will expire before the paused time and we've finished
- // executing expired timers.
- synchronized (timeouts) {
- if (clock::paused &&
- (timeouts->size() == 0 ||
- timeouts->begin()->first > clock::current)) {
- VLOG(3) << "Clock has settled";
- clock::settling = false;
- }
- }
-}
-
-
void recv_data(struct ev_loop* loop, ev_io* watcher, int revents)
{
DataDecoder* decoder = (DataDecoder*) watcher->data;
@@ -1501,8 +1165,6 @@ void initialize(const string& delegate)
process_manager = new ProcessManager(delegate);
socket_manager = new SocketManager();
- Clock::initialize(lambda::bind(&timedout, lambda::_1));
-
// Setup processing threads.
// We create no fewer than 8 threads because some tests require
// more worker threads than 'sysconf(_SC_NPROCESSORS_ONLN)' on
@@ -1628,15 +1290,11 @@ void initialize(const string& delegate)
ev_async_init(&async_watcher, handle_async);
ev_async_start(loop, &async_watcher);
- ev_async_init(&async_update_timer_watcher, handle_async_update_timer);
- ev_async_start(loop, &async_update_timer_watcher);
-
- ev_timer_init(&timeouts_watcher, handle_timeouts, 0., 2100000.0);
- ev_timer_again(loop, &timeouts_watcher);
-
ev_io_init(&server_watcher, accept, __s__, EV_READ);
ev_io_start(loop, &server_watcher);
+ Clock::initialize(lambda::bind(&timedout, lambda::_1));
+
// ev_child_init(&child_watcher, child_exited, pid, 0);
// ev_child_start(loop, &cw);
@@ -3215,62 +2873,6 @@ Future<Response> ProcessManager::__processes__(const Request&)
}
-Timer Clock::timer(
- const Duration& duration,
- const lambda::function<void(void)>& thunk)
-{
- static uint64_t id = 1; // Start at 1 since Timer() instances use id 0.
-
- // Assumes Clock::now() does Clock::now(__process__).
- Timeout timeout = Timeout::in(duration);
-
- UPID pid = __process__ != NULL ? __process__->self() : UPID();
-
- Timer timer(__sync_fetch_and_add(&id, 1), timeout, pid, thunk);
-
- VLOG(3) << "Created a timer for " << pid << " in " << stringify(duration)
- << " in the future (" << timeout.time() << ")";
-
- // Add the timer.
- synchronized (timeouts) {
- if (timeouts->size() == 0 ||
- timer.timeout().time() < timeouts->begin()->first) {
- // Need to interrupt the loop to update/set timer repeat.
- (*timeouts)[timer.timeout().time()].push_back(timer);
- update_timer = true;
- ev_async_send(loop, &async_update_timer_watcher);
- } else {
- // Timer repeat is adequate, just add the timeout.
- CHECK(timeouts->size() >= 1);
- (*timeouts)[timer.timeout().time()].push_back(timer);
- }
- }
-
- return timer;
-}
-
-
-bool Clock::cancel(const Timer& timer)
-{
- bool canceled = false;
- synchronized (timeouts) {
- // Check if the timeout is still pending, and if so, erase it. In
- // addition, erase an empty list if we just removed the last
- // timeout.
- Time time = timer.timeout().time();
- if (timeouts->count(time) > 0) {
- canceled = true;
- (*timeouts)[time].remove(timer);
- if ((*timeouts)[time].empty()) {
- timeouts->erase(time);
- }
- }
- }
-
- return canceled;
-}
-
-
ProcessBase::ProcessBase(const string& id)
{
process::initialize();
[10/30] mesos git commit: Moved process::io::* into io.cpp.
Posted by be...@apache.org.
Moved process::io::* into io.cpp.
Review: https://reviews.apache.org/r/27506
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/37bba65e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/37bba65e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/37bba65e
Branch: refs/heads/master
Commit: 37bba65e897e9e06640b7f126fe4871ab917f1ce
Parents: 413ce94
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sun Nov 2 18:28:35 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 16:25:58 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/Makefile.am | 1 +
3rdparty/libprocess/src/io.cpp | 647 +++++++++++++++++++++++++++++++
3rdparty/libprocess/src/process.cpp | 633 ------------------------------
3 files changed, 648 insertions(+), 633 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/37bba65e/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index 41c3bd1..aebd281 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -37,6 +37,7 @@ libprocess_la_SOURCES = \
src/gate.hpp \
src/help.cpp \
src/http.cpp \
+ src/io.cpp \
src/latch.cpp \
src/libev.hpp \
src/libev.cpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/37bba65e/3rdparty/libprocess/src/io.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/io.cpp b/3rdparty/libprocess/src/io.cpp
new file mode 100644
index 0000000..75fe75c
--- /dev/null
+++ b/3rdparty/libprocess/src/io.cpp
@@ -0,0 +1,647 @@
+#include <string>
+
+#include <boost/shared_array.hpp>
+
+#include <process/future.hpp>
+#include <process/io.hpp>
+#include <process/process.hpp> // For process::initialize.
+
+#include <stout/lambda.hpp>
+#include <stout/memory.hpp>
+#include <stout/nothing.hpp>
+#include <stout/os.hpp>
+#include <stout/try.hpp>
+
+using std::string;
+
+namespace process {
+namespace io {
+namespace internal {
+
+void read(
+ int fd,
+ void* data,
+ size_t size,
+ const memory::shared_ptr<Promise<size_t> >& promise,
+ const Future<short>& future)
+{
+ // Ignore this function if the read operation has been discarded.
+ if (promise->future().hasDiscard()) {
+ CHECK(!future.isPending());
+ promise->discard();
+ return;
+ }
+
+ if (size == 0) {
+ promise->set(0);
+ return;
+ }
+
+ if (future.isDiscarded()) {
+ promise->fail("Failed to poll: discarded future");
+ } else if (future.isFailed()) {
+ promise->fail(future.failure());
+ } else {
+ ssize_t length = ::read(fd, data, size);
+ if (length < 0) {
+ if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
+ // Restart the read operation.
+ Future<short> future =
+ io::poll(fd, process::io::READ).onAny(
+ lambda::bind(&internal::read,
+ fd,
+ data,
+ size,
+ promise,
+ lambda::_1));
+
+ // Stop polling if a discard occurs on our future.
+ promise->future().onDiscard(
+ lambda::bind(&process::internal::discard<short>,
+ WeakFuture<short>(future)));
+ } else {
+ // Error occurred.
+ promise->fail(strerror(errno));
+ }
+ } else {
+ promise->set(length);
+ }
+ }
+}
+
+
+void write(
+ int fd,
+ void* data,
+ size_t size,
+ const memory::shared_ptr<Promise<size_t> >& promise,
+ const Future<short>& future)
+{
+ // Ignore this function if the write operation has been discarded.
+ if (promise->future().hasDiscard()) {
+ promise->discard();
+ return;
+ }
+
+ if (size == 0) {
+ promise->set(0);
+ return;
+ }
+
+ if (future.isDiscarded()) {
+ promise->fail("Failed to poll: discarded future");
+ } else if (future.isFailed()) {
+ promise->fail(future.failure());
+ } else {
+ // Do a write but ignore SIGPIPE so we can return an error when
+ // writing to a pipe or socket where the reading end is closed.
+ // TODO(benh): The 'suppress' macro failed to work on OS X as it
+ // appears that signal delivery was happening asynchronously.
+ // That is, the signal would not appear to be pending when the
+ // 'suppress' block was closed thus the destructor for
+ // 'Suppressor' was not waiting/removing the signal via 'sigwait'.
+ // It also appeared that the signal would be delivered to another
+ // thread even if it remained blocked in this thiread. The
+ // workaround here is to check explicitly for EPIPE and then do
+ // 'sigwait' regardless of what 'os::signals::pending' returns. We
+ // don't have that luxury with 'Suppressor' and arbitrary signals
+ // because we don't always have something like EPIPE to tell us
+ // that a signal is (or will soon be) pending.
+ bool pending = os::signals::pending(SIGPIPE);
+ bool unblock = !pending ? os::signals::block(SIGPIPE) : false;
+
+ ssize_t length = ::write(fd, data, size);
+
+ // Save the errno so we can restore it after doing sig* functions
+ // below.
+ int errno_ = errno;
+
+ if (length < 0 && errno == EPIPE && !pending) {
+ sigset_t mask;
+ sigemptyset(&mask);
+ sigaddset(&mask, SIGPIPE);
+
+ int result;
+ do {
+ int ignored;
+ result = sigwait(&mask, &ignored);
+ } while (result == -1 && errno == EINTR);
+ }
+
+ if (unblock) {
+ os::signals::unblock(SIGPIPE);
+ }
+
+ errno = errno_;
+
+ if (length < 0) {
+ if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
+ // Restart the write operation.
+ Future<short> future =
+ io::poll(fd, process::io::WRITE).onAny(
+ lambda::bind(&internal::write,
+ fd,
+ data,
+ size,
+ promise,
+ lambda::_1));
+
+ // Stop polling if a discard occurs on our future.
+ promise->future().onDiscard(
+ lambda::bind(&process::internal::discard<short>,
+ WeakFuture<short>(future)));
+ } else {
+ // Error occurred.
+ promise->fail(strerror(errno));
+ }
+ } else {
+ // TODO(benh): Retry if 'length' is 0?
+ promise->set(length);
+ }
+ }
+}
+
+} // namespace internal {
+
+
+Future<size_t> read(int fd, void* data, size_t size)
+{
+ process::initialize();
+
+ memory::shared_ptr<Promise<size_t> > promise(new Promise<size_t>());
+
+ // Check the file descriptor.
+ Try<bool> nonblock = os::isNonblock(fd);
+ if (nonblock.isError()) {
+ // The file descriptor is not valid (e.g., has been closed).
+ promise->fail(
+ "Failed to check if file descriptor was non-blocking: " +
+ nonblock.error());
+ return promise->future();
+ } else if (!nonblock.get()) {
+ // The file descriptor is not non-blocking.
+ promise->fail("Expected a non-blocking file descriptor");
+ return promise->future();
+ }
+
+ // Because the file descriptor is non-blocking, we call read()
+ // immediately. The read may in turn call poll if necessary,
+ // avoiding unnecessary polling. We also observed that for some
+ // combination of libev and Linux kernel versions, the poll would
+ // block for non-deterministically long periods of time. This may be
+ // fixed in a newer version of libev (we use 3.8 at the time of
+ // writing this comment).
+ internal::read(fd, data, size, promise, io::READ);
+
+ return promise->future();
+}
+
+
+Future<size_t> write(int fd, void* data, size_t size)
+{
+ process::initialize();
+
+ memory::shared_ptr<Promise<size_t> > promise(new Promise<size_t>());
+
+ // Check the file descriptor.
+ Try<bool> nonblock = os::isNonblock(fd);
+ if (nonblock.isError()) {
+ // The file descriptor is not valid (e.g., has been closed).
+ promise->fail(
+ "Failed to check if file descriptor was non-blocking: " +
+ nonblock.error());
+ return promise->future();
+ } else if (!nonblock.get()) {
+ // The file descriptor is not non-blocking.
+ promise->fail("Expected a non-blocking file descriptor");
+ return promise->future();
+ }
+
+ // Because the file descriptor is non-blocking, we call write()
+ // immediately. The write may in turn call poll if necessary,
+ // avoiding unnecessary polling. We also observed that for some
+ // combination of libev and Linux kernel versions, the poll would
+ // block for non-deterministically long periods of time. This may be
+ // fixed in a newer version of libev (we use 3.8 at the time of
+ // writing this comment).
+ internal::write(fd, data, size, promise, io::WRITE);
+
+ return promise->future();
+}
+
+
+namespace internal {
+
+#if __cplusplus >= 201103L
+Future<string> _read(
+ int fd,
+ const memory::shared_ptr<string>& buffer,
+ const boost::shared_array<char>& data,
+ size_t length)
+{
+ return io::read(fd, data.get(), length)
+ .then([=] (size_t size) -> Future<string> {
+ if (size == 0) { // EOF.
+ return string(*buffer);
+ }
+ buffer->append(data.get(), size);
+ return _read(fd, buffer, data, length);
+ });
+}
+#else
+// Forward declataion.
+Future<string> _read(
+ int fd,
+ const memory::shared_ptr<string>& buffer,
+ const boost::shared_array<char>& data,
+ size_t length);
+
+
+Future<string> __read(
+ size_t size,
+ int fd,
+ const memory::shared_ptr<string>& buffer,
+ const boost::shared_array<char>& data,
+ size_t length)
+{
+ if (size == 0) { // EOF.
+ return string(*buffer);
+ }
+
+ buffer->append(data.get(), size);
+
+ return _read(fd, buffer, data, length);
+}
+
+
+Future<string> _read(
+ int fd,
+ const memory::shared_ptr<string>& buffer,
+ const boost::shared_array<char>& data,
+ size_t length)
+{
+ return io::read(fd, data.get(), length)
+ .then(lambda::bind(&__read, lambda::_1, fd, buffer, data, length));
+}
+#endif // __cplusplus >= 201103L
+
+
+#if __cplusplus >= 201103L
+Future<Nothing> _write(
+ int fd,
+ Owned<string> data,
+ size_t index)
+{
+ return io::write(fd, (void*) (data->data() + index), data->size() - index)
+ .then([=] (size_t length) -> Future<Nothing> {
+ if (index + length == data->size()) {
+ return Nothing();
+ }
+ return _write(fd, data, index + length);
+ });
+}
+#else
+// Forward declaration.
+Future<Nothing> _write(
+ int fd,
+ Owned<string> data,
+ size_t index);
+
+
+Future<Nothing> __write(
+ int fd,
+ Owned<string> data,
+ size_t index,
+ size_t length)
+{
+ if (index + length == data->size()) {
+ return Nothing();
+ }
+ return _write(fd, data, index + length);
+}
+
+
+Future<Nothing> _write(
+ int fd,
+ Owned<string> data,
+ size_t index)
+{
+ return io::write(fd, (void*) (data->data() + index), data->size() - index)
+ .then(lambda::bind(&__write, fd, data, index, lambda::_1));
+}
+#endif // __cplusplus >= 201103L
+
+
+#if __cplusplus >= 201103L
+void _splice(
+ int from,
+ int to,
+ size_t chunk,
+ boost::shared_array<char> data,
+ memory::shared_ptr<Promise<Nothing>> promise)
+{
+ // Stop splicing if a discard occured on our future.
+ if (promise->future().hasDiscard()) {
+ // TODO(benh): Consider returning the number of bytes already
+ // spliced on discarded, or a failure. Same for the 'onDiscarded'
+ // callbacks below.
+ promise->discard();
+ return;
+ }
+
+ // Note that only one of io::read or io::write is outstanding at any
+ // one point in time thus the reuse of 'data' for both operations.
+
+ Future<size_t> read = io::read(from, data.get(), chunk);
+
+ // Stop reading (or potentially indefinitely polling) if a discard
+ // occcurs on our future.
+ promise->future().onDiscard(
+ lambda::bind(&process::internal::discard<size_t>,
+ WeakFuture<size_t>(read)));
+
+ read
+ .onReady([=] (size_t size) {
+ if (size == 0) { // EOF.
+ promise->set(Nothing());
+ } else {
+ // Note that we always try and complete the write, even if a
+ // discard has occured on our future, in order to provide
+ // semantics where everything read is written. The promise
+ // will eventually be discarded in the next read.
+ io::write(to, string(data.get(), size))
+ .onReady([=] () { _splice(from, to, chunk, data, promise); })
+ .onFailed([=] (const string& message) { promise->fail(message); })
+ .onDiscarded([=] () { promise->discard(); });
+ }
+ })
+ .onFailed([=] (const string& message) { promise->fail(message); })
+ .onDiscarded([=] () { promise->discard(); });
+}
+#else
+// Forward declarations.
+void __splice(
+ int from,
+ int to,
+ size_t chunk,
+ boost::shared_array<char> data,
+ memory::shared_ptr<Promise<Nothing> > promise,
+ size_t size);
+
+void ___splice(
+ memory::shared_ptr<Promise<Nothing> > promise,
+ const string& message);
+
+void ____splice(
+ memory::shared_ptr<Promise<Nothing> > promise);
+
+
+void _splice(
+ int from,
+ int to,
+ size_t chunk,
+ boost::shared_array<char> data,
+ memory::shared_ptr<Promise<Nothing> > promise)
+{
+ // Stop splicing if a discard occured on our future.
+ if (promise->future().hasDiscard()) {
+ // TODO(benh): Consider returning the number of bytes already
+ // spliced on discarded, or a failure. Same for the 'onDiscarded'
+ // callbacks below.
+ promise->discard();
+ return;
+ }
+
+ Future<size_t> read = io::read(from, data.get(), chunk);
+
+ // Stop reading (or potentially indefinitely polling) if a discard
+ // occurs on our future.
+ promise->future().onDiscard(
+ lambda::bind(&process::internal::discard<size_t>,
+ WeakFuture<size_t>(read)));
+
+ read
+ .onReady(
+ lambda::bind(&__splice, from, to, chunk, data, promise, lambda::_1))
+ .onFailed(lambda::bind(&___splice, promise, lambda::_1))
+ .onDiscarded(lambda::bind(&____splice, promise));
+}
+
+
+void __splice(
+ int from,
+ int to,
+ size_t chunk,
+ boost::shared_array<char> data,
+ memory::shared_ptr<Promise<Nothing> > promise,
+ size_t size)
+{
+ if (size == 0) { // EOF.
+ promise->set(Nothing());
+ } else {
+ // Note that we always try and complete the write, even if a
+ // discard has occured on our future, in order to provide
+ // semantics where everything read is written. The promise will
+ // eventually be discarded in the next read.
+ io::write(to, string(data.get(), size))
+ .onReady(lambda::bind(&_splice, from, to, chunk, data, promise))
+ .onFailed(lambda::bind(&___splice, promise, lambda::_1))
+ .onDiscarded(lambda::bind(&____splice, promise));
+ }
+}
+
+
+void ___splice(
+ memory::shared_ptr<Promise<Nothing> > promise,
+ const string& message)
+{
+ promise->fail(message);
+}
+
+
+void ____splice(
+ memory::shared_ptr<Promise<Nothing> > promise)
+{
+ promise->discard();
+}
+#endif // __cplusplus >= 201103L
+
+
+Future<Nothing> splice(int from, int to, size_t chunk)
+{
+ boost::shared_array<char> data(new char[chunk]);
+
+ // Rather than having internal::_splice return a future and
+ // implementing internal::_splice as a chain of io::read and
+ // io::write calls, we use an explicit promise that we pass around
+ // so that we don't increase memory usage the longer that we splice.
+ memory::shared_ptr<Promise<Nothing> > promise(new Promise<Nothing>());
+
+ Future<Nothing> future = promise->future();
+
+ _splice(from, to, chunk, data, promise);
+
+ return future;
+}
+
+} // namespace internal {
+
+
+Future<string> read(int fd)
+{
+ process::initialize();
+
+ // Get our own copy of the file descriptor so that we're in control
+ // of the lifetime and don't crash if/when someone by accidently
+ // closes the file descriptor before discarding this future. We can
+ // also make sure it's non-blocking and will close-on-exec. Start by
+ // checking we've got a "valid" file descriptor before dup'ing.
+ if (fd < 0) {
+ return Failure(strerror(EBADF));
+ }
+
+ fd = dup(fd);
+ if (fd == -1) {
+ return Failure(ErrnoError("Failed to duplicate file descriptor"));
+ }
+
+ // Set the close-on-exec flag.
+ Try<Nothing> cloexec = os::cloexec(fd);
+ if (cloexec.isError()) {
+ os::close(fd);
+ return Failure(
+ "Failed to set close-on-exec on duplicated file descriptor: " +
+ cloexec.error());
+ }
+
+ // Make the file descriptor is non-blocking.
+ Try<Nothing> nonblock = os::nonblock(fd);
+ if (nonblock.isError()) {
+ os::close(fd);
+ return Failure(
+ "Failed to make duplicated file descriptor non-blocking: " +
+ nonblock.error());
+ }
+
+ // TODO(benh): Wrap up this data as a struct, use 'Owner'.
+ // TODO(bmahler): For efficiency, use a rope for the buffer.
+ memory::shared_ptr<string> buffer(new string());
+ boost::shared_array<char> data(new char[BUFFERED_READ_SIZE]);
+
+ return internal::_read(fd, buffer, data, BUFFERED_READ_SIZE)
+ .onAny(lambda::bind(&os::close, fd));
+}
+
+
+Future<Nothing> write(int fd, const std::string& data)
+{
+ process::initialize();
+
+ // Get our own copy of the file descriptor so that we're in control
+ // of the lifetime and don't crash if/when someone by accidently
+ // closes the file descriptor before discarding this future. We can
+ // also make sure it's non-blocking and will close-on-exec. Start by
+ // checking we've got a "valid" file descriptor before dup'ing.
+ if (fd < 0) {
+ return Failure(strerror(EBADF));
+ }
+
+ fd = dup(fd);
+ if (fd == -1) {
+ return Failure(ErrnoError("Failed to duplicate file descriptor"));
+ }
+
+ // Set the close-on-exec flag.
+ Try<Nothing> cloexec = os::cloexec(fd);
+ if (cloexec.isError()) {
+ os::close(fd);
+ return Failure(
+ "Failed to set close-on-exec on duplicated file descriptor: " +
+ cloexec.error());
+ }
+
+ // Make the file descriptor is non-blocking.
+ Try<Nothing> nonblock = os::nonblock(fd);
+ if (nonblock.isError()) {
+ os::close(fd);
+ return Failure(
+ "Failed to make duplicated file descriptor non-blocking: " +
+ nonblock.error());
+ }
+
+ return internal::_write(fd, Owned<string>(new string(data)), 0)
+ .onAny(lambda::bind(&os::close, fd));
+}
+
+
+Future<Nothing> redirect(int from, Option<int> to, size_t chunk)
+{
+ // Make sure we've got "valid" file descriptors.
+ if (from < 0 || (to.isSome() && to.get() < 0)) {
+ return Failure(strerror(EBADF));
+ }
+
+ if (to.isNone()) {
+ // Open up /dev/null that we can splice into.
+ Try<int> open = os::open("/dev/null", O_WRONLY);
+
+ if (open.isError()) {
+ return Failure("Failed to open /dev/null for writing: " + open.error());
+ }
+
+ to = open.get();
+ } else {
+ // Duplicate 'to' so that we're in control of its lifetime.
+ int fd = dup(to.get());
+ if (fd == -1) {
+ return Failure(ErrnoError("Failed to duplicate 'to' file descriptor"));
+ }
+
+ to = fd;
+ }
+
+ CHECK_SOME(to);
+
+ // Duplicate 'from' so that we're in control of its lifetime.
+ from = dup(from);
+ if (from == -1) {
+ return Failure(ErrnoError("Failed to duplicate 'from' file descriptor"));
+ }
+
+ // Set the close-on-exec flag (no-op if already set).
+ Try<Nothing> cloexec = os::cloexec(from);
+ if (cloexec.isError()) {
+ os::close(from);
+ os::close(to.get());
+ return Failure("Failed to set close-on-exec on 'from': " + cloexec.error());
+ }
+
+ cloexec = os::cloexec(to.get());
+ if (cloexec.isError()) {
+ os::close(from);
+ os::close(to.get());
+ return Failure("Failed to set close-on-exec on 'to': " + cloexec.error());
+ }
+
+ // Make the file descriptors non-blocking (no-op if already set).
+ Try<Nothing> nonblock = os::nonblock(from);
+ if (nonblock.isError()) {
+ os::close(from);
+ os::close(to.get());
+ return Failure("Failed to make 'from' non-blocking: " + nonblock.error());
+ }
+
+ nonblock = os::nonblock(to.get());
+ if (nonblock.isError()) {
+ os::close(from);
+ os::close(to.get());
+ return Failure("Failed to make 'to' non-blocking: " + nonblock.error());
+ }
+
+ return internal::splice(from, to.get(), chunk)
+ .onAny(lambda::bind(&os::close, from))
+ .onAny(lambda::bind(&os::close, to.get()));
+}
+
+} // namespace io {
+} // namespace process {
http://git-wip-us.apache.org/repos/asf/mesos/blob/37bba65e/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 1fc8874..aeaac0c 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -41,8 +41,6 @@
#include <stdexcept>
#include <vector>
-#include <boost/shared_array.hpp>
-
#include <process/check.hpp>
#include <process/clock.hpp>
#include <process/defer.hpp>
@@ -3115,637 +3113,6 @@ void post(const UPID& from,
}
-namespace io {
-namespace internal {
-
-void read(
- int fd,
- void* data,
- size_t size,
- const memory::shared_ptr<Promise<size_t> >& promise,
- const Future<short>& future)
-{
- // Ignore this function if the read operation has been discarded.
- if (promise->future().hasDiscard()) {
- CHECK(!future.isPending());
- promise->discard();
- return;
- }
-
- if (size == 0) {
- promise->set(0);
- return;
- }
-
- if (future.isDiscarded()) {
- promise->fail("Failed to poll: discarded future");
- } else if (future.isFailed()) {
- promise->fail(future.failure());
- } else {
- ssize_t length = ::read(fd, data, size);
- if (length < 0) {
- if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
- // Restart the read operation.
- Future<short> future =
- io::poll(fd, process::io::READ).onAny(
- lambda::bind(&internal::read,
- fd,
- data,
- size,
- promise,
- lambda::_1));
-
- // Stop polling if a discard occurs on our future.
- promise->future().onDiscard(
- lambda::bind(&process::internal::discard<short>,
- WeakFuture<short>(future)));
- } else {
- // Error occurred.
- promise->fail(strerror(errno));
- }
- } else {
- promise->set(length);
- }
- }
-}
-
-
-void write(
- int fd,
- void* data,
- size_t size,
- const memory::shared_ptr<Promise<size_t> >& promise,
- const Future<short>& future)
-{
- // Ignore this function if the write operation has been discarded.
- if (promise->future().hasDiscard()) {
- promise->discard();
- return;
- }
-
- if (size == 0) {
- promise->set(0);
- return;
- }
-
- if (future.isDiscarded()) {
- promise->fail("Failed to poll: discarded future");
- } else if (future.isFailed()) {
- promise->fail(future.failure());
- } else {
- // Do a write but ignore SIGPIPE so we can return an error when
- // writing to a pipe or socket where the reading end is closed.
- // TODO(benh): The 'suppress' macro failed to work on OS X as it
- // appears that signal delivery was happening asynchronously.
- // That is, the signal would not appear to be pending when the
- // 'suppress' block was closed thus the destructor for
- // 'Suppressor' was not waiting/removing the signal via 'sigwait'.
- // It also appeared that the signal would be delivered to another
- // thread even if it remained blocked in this thiread. The
- // workaround here is to check explicitly for EPIPE and then do
- // 'sigwait' regardless of what 'os::signals::pending' returns. We
- // don't have that luxury with 'Suppressor' and arbitrary signals
- // because we don't always have something like EPIPE to tell us
- // that a signal is (or will soon be) pending.
- bool pending = os::signals::pending(SIGPIPE);
- bool unblock = !pending ? os::signals::block(SIGPIPE) : false;
-
- ssize_t length = ::write(fd, data, size);
-
- // Save the errno so we can restore it after doing sig* functions
- // below.
- int errno_ = errno;
-
- if (length < 0 && errno == EPIPE && !pending) {
- sigset_t mask;
- sigemptyset(&mask);
- sigaddset(&mask, SIGPIPE);
-
- int result;
- do {
- int ignored;
- result = sigwait(&mask, &ignored);
- } while (result == -1 && errno == EINTR);
- }
-
- if (unblock) {
- os::signals::unblock(SIGPIPE);
- }
-
- errno = errno_;
-
- if (length < 0) {
- if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
- // Restart the write operation.
- Future<short> future =
- io::poll(fd, process::io::WRITE).onAny(
- lambda::bind(&internal::write,
- fd,
- data,
- size,
- promise,
- lambda::_1));
-
- // Stop polling if a discard occurs on our future.
- promise->future().onDiscard(
- lambda::bind(&process::internal::discard<short>,
- WeakFuture<short>(future)));
- } else {
- // Error occurred.
- promise->fail(strerror(errno));
- }
- } else {
- // TODO(benh): Retry if 'length' is 0?
- promise->set(length);
- }
- }
-}
-
-} // namespace internal {
-
-
-Future<size_t> read(int fd, void* data, size_t size)
-{
- process::initialize();
-
- memory::shared_ptr<Promise<size_t> > promise(new Promise<size_t>());
-
- // Check the file descriptor.
- Try<bool> nonblock = os::isNonblock(fd);
- if (nonblock.isError()) {
- // The file descriptor is not valid (e.g., has been closed).
- promise->fail(
- "Failed to check if file descriptor was non-blocking: " +
- nonblock.error());
- return promise->future();
- } else if (!nonblock.get()) {
- // The file descriptor is not non-blocking.
- promise->fail("Expected a non-blocking file descriptor");
- return promise->future();
- }
-
- // Because the file descriptor is non-blocking, we call read()
- // immediately. The read may in turn call poll if necessary,
- // avoiding unnecessary polling. We also observed that for some
- // combination of libev and Linux kernel versions, the poll would
- // block for non-deterministically long periods of time. This may be
- // fixed in a newer version of libev (we use 3.8 at the time of
- // writing this comment).
- internal::read(fd, data, size, promise, io::READ);
-
- return promise->future();
-}
-
-
-Future<size_t> write(int fd, void* data, size_t size)
-{
- process::initialize();
-
- memory::shared_ptr<Promise<size_t> > promise(new Promise<size_t>());
-
- // Check the file descriptor.
- Try<bool> nonblock = os::isNonblock(fd);
- if (nonblock.isError()) {
- // The file descriptor is not valid (e.g., has been closed).
- promise->fail(
- "Failed to check if file descriptor was non-blocking: " +
- nonblock.error());
- return promise->future();
- } else if (!nonblock.get()) {
- // The file descriptor is not non-blocking.
- promise->fail("Expected a non-blocking file descriptor");
- return promise->future();
- }
-
- // Because the file descriptor is non-blocking, we call write()
- // immediately. The write may in turn call poll if necessary,
- // avoiding unnecessary polling. We also observed that for some
- // combination of libev and Linux kernel versions, the poll would
- // block for non-deterministically long periods of time. This may be
- // fixed in a newer version of libev (we use 3.8 at the time of
- // writing this comment).
- internal::write(fd, data, size, promise, io::WRITE);
-
- return promise->future();
-}
-
-
-namespace internal {
-
-#if __cplusplus >= 201103L
-Future<string> _read(
- int fd,
- const memory::shared_ptr<string>& buffer,
- const boost::shared_array<char>& data,
- size_t length)
-{
- return io::read(fd, data.get(), length)
- .then([=] (size_t size) -> Future<string> {
- if (size == 0) { // EOF.
- return string(*buffer);
- }
- buffer->append(data.get(), size);
- return _read(fd, buffer, data, length);
- });
-}
-#else
-// Forward declataion.
-Future<string> _read(
- int fd,
- const memory::shared_ptr<string>& buffer,
- const boost::shared_array<char>& data,
- size_t length);
-
-
-Future<string> __read(
- size_t size,
- int fd,
- const memory::shared_ptr<string>& buffer,
- const boost::shared_array<char>& data,
- size_t length)
-{
- if (size == 0) { // EOF.
- return string(*buffer);
- }
-
- buffer->append(data.get(), size);
-
- return _read(fd, buffer, data, length);
-}
-
-
-Future<string> _read(
- int fd,
- const memory::shared_ptr<string>& buffer,
- const boost::shared_array<char>& data,
- size_t length)
-{
- return io::read(fd, data.get(), length)
- .then(lambda::bind(&__read, lambda::_1, fd, buffer, data, length));
-}
-#endif // __cplusplus >= 201103L
-
-
-#if __cplusplus >= 201103L
-Future<Nothing> _write(
- int fd,
- Owned<string> data,
- size_t index)
-{
- return io::write(fd, (void*) (data->data() + index), data->size() - index)
- .then([=] (size_t length) -> Future<Nothing> {
- if (index + length == data->size()) {
- return Nothing();
- }
- return _write(fd, data, index + length);
- });
-}
-#else
-// Forward declaration.
-Future<Nothing> _write(
- int fd,
- Owned<string> data,
- size_t index);
-
-
-Future<Nothing> __write(
- int fd,
- Owned<string> data,
- size_t index,
- size_t length)
-{
- if (index + length == data->size()) {
- return Nothing();
- }
- return _write(fd, data, index + length);
-}
-
-
-Future<Nothing> _write(
- int fd,
- Owned<string> data,
- size_t index)
-{
- return io::write(fd, (void*) (data->data() + index), data->size() - index)
- .then(lambda::bind(&__write, fd, data, index, lambda::_1));
-}
-#endif // __cplusplus >= 201103L
-
-
-#if __cplusplus >= 201103L
-void _splice(
- int from,
- int to,
- size_t chunk,
- boost::shared_array<char> data,
- memory::shared_ptr<Promise<Nothing>> promise)
-{
- // Stop splicing if a discard occured on our future.
- if (promise->future().hasDiscard()) {
- // TODO(benh): Consider returning the number of bytes already
- // spliced on discarded, or a failure. Same for the 'onDiscarded'
- // callbacks below.
- promise->discard();
- return;
- }
-
- // Note that only one of io::read or io::write is outstanding at any
- // one point in time thus the reuse of 'data' for both operations.
-
- Future<size_t> read = io::read(from, data.get(), chunk);
-
- // Stop reading (or potentially indefinitely polling) if a discard
- // occcurs on our future.
- promise->future().onDiscard(
- lambda::bind(&process::internal::discard<size_t>,
- WeakFuture<size_t>(read)));
-
- read
- .onReady([=] (size_t size) {
- if (size == 0) { // EOF.
- promise->set(Nothing());
- } else {
- // Note that we always try and complete the write, even if a
- // discard has occured on our future, in order to provide
- // semantics where everything read is written. The promise
- // will eventually be discarded in the next read.
- io::write(to, string(data.get(), size))
- .onReady([=] () { _splice(from, to, chunk, data, promise); })
- .onFailed([=] (const string& message) { promise->fail(message); })
- .onDiscarded([=] () { promise->discard(); });
- }
- })
- .onFailed([=] (const string& message) { promise->fail(message); })
- .onDiscarded([=] () { promise->discard(); });
-}
-#else
-// Forward declarations.
-void __splice(
- int from,
- int to,
- size_t chunk,
- boost::shared_array<char> data,
- memory::shared_ptr<Promise<Nothing> > promise,
- size_t size);
-
-void ___splice(
- memory::shared_ptr<Promise<Nothing> > promise,
- const string& message);
-
-void ____splice(
- memory::shared_ptr<Promise<Nothing> > promise);
-
-
-void _splice(
- int from,
- int to,
- size_t chunk,
- boost::shared_array<char> data,
- memory::shared_ptr<Promise<Nothing> > promise)
-{
- // Stop splicing if a discard occured on our future.
- if (promise->future().hasDiscard()) {
- // TODO(benh): Consider returning the number of bytes already
- // spliced on discarded, or a failure. Same for the 'onDiscarded'
- // callbacks below.
- promise->discard();
- return;
- }
-
- Future<size_t> read = io::read(from, data.get(), chunk);
-
- // Stop reading (or potentially indefinitely polling) if a discard
- // occurs on our future.
- promise->future().onDiscard(
- lambda::bind(&process::internal::discard<size_t>,
- WeakFuture<size_t>(read)));
-
- read
- .onReady(
- lambda::bind(&__splice, from, to, chunk, data, promise, lambda::_1))
- .onFailed(lambda::bind(&___splice, promise, lambda::_1))
- .onDiscarded(lambda::bind(&____splice, promise));
-}
-
-
-void __splice(
- int from,
- int to,
- size_t chunk,
- boost::shared_array<char> data,
- memory::shared_ptr<Promise<Nothing> > promise,
- size_t size)
-{
- if (size == 0) { // EOF.
- promise->set(Nothing());
- } else {
- // Note that we always try and complete the write, even if a
- // discard has occured on our future, in order to provide
- // semantics where everything read is written. The promise will
- // eventually be discarded in the next read.
- io::write(to, string(data.get(), size))
- .onReady(lambda::bind(&_splice, from, to, chunk, data, promise))
- .onFailed(lambda::bind(&___splice, promise, lambda::_1))
- .onDiscarded(lambda::bind(&____splice, promise));
- }
-}
-
-
-void ___splice(
- memory::shared_ptr<Promise<Nothing> > promise,
- const string& message)
-{
- promise->fail(message);
-}
-
-
-void ____splice(
- memory::shared_ptr<Promise<Nothing> > promise)
-{
- promise->discard();
-}
-#endif // __cplusplus >= 201103L
-
-
-Future<Nothing> splice(int from, int to, size_t chunk)
-{
- boost::shared_array<char> data(new char[chunk]);
-
- // Rather than having internal::_splice return a future and
- // implementing internal::_splice as a chain of io::read and
- // io::write calls, we use an explicit promise that we pass around
- // so that we don't increase memory usage the longer that we splice.
- memory::shared_ptr<Promise<Nothing> > promise(new Promise<Nothing>());
-
- Future<Nothing> future = promise->future();
-
- _splice(from, to, chunk, data, promise);
-
- return future;
-}
-
-} // namespace internal {
-
-
-Future<string> read(int fd)
-{
- process::initialize();
-
- // Get our own copy of the file descriptor so that we're in control
- // of the lifetime and don't crash if/when someone by accidently
- // closes the file descriptor before discarding this future. We can
- // also make sure it's non-blocking and will close-on-exec. Start by
- // checking we've got a "valid" file descriptor before dup'ing.
- if (fd < 0) {
- return Failure(strerror(EBADF));
- }
-
- fd = dup(fd);
- if (fd == -1) {
- return Failure(ErrnoError("Failed to duplicate file descriptor"));
- }
-
- // Set the close-on-exec flag.
- Try<Nothing> cloexec = os::cloexec(fd);
- if (cloexec.isError()) {
- os::close(fd);
- return Failure(
- "Failed to set close-on-exec on duplicated file descriptor: " +
- cloexec.error());
- }
-
- // Make the file descriptor is non-blocking.
- Try<Nothing> nonblock = os::nonblock(fd);
- if (nonblock.isError()) {
- os::close(fd);
- return Failure(
- "Failed to make duplicated file descriptor non-blocking: " +
- nonblock.error());
- }
-
- // TODO(benh): Wrap up this data as a struct, use 'Owner'.
- // TODO(bmahler): For efficiency, use a rope for the buffer.
- memory::shared_ptr<string> buffer(new string());
- boost::shared_array<char> data(new char[BUFFERED_READ_SIZE]);
-
- return internal::_read(fd, buffer, data, BUFFERED_READ_SIZE)
- .onAny(lambda::bind(&os::close, fd));
-}
-
-
-Future<Nothing> write(int fd, const std::string& data)
-{
- process::initialize();
-
- // Get our own copy of the file descriptor so that we're in control
- // of the lifetime and don't crash if/when someone by accidently
- // closes the file descriptor before discarding this future. We can
- // also make sure it's non-blocking and will close-on-exec. Start by
- // checking we've got a "valid" file descriptor before dup'ing.
- if (fd < 0) {
- return Failure(strerror(EBADF));
- }
-
- fd = dup(fd);
- if (fd == -1) {
- return Failure(ErrnoError("Failed to duplicate file descriptor"));
- }
-
- // Set the close-on-exec flag.
- Try<Nothing> cloexec = os::cloexec(fd);
- if (cloexec.isError()) {
- os::close(fd);
- return Failure(
- "Failed to set close-on-exec on duplicated file descriptor: " +
- cloexec.error());
- }
-
- // Make the file descriptor is non-blocking.
- Try<Nothing> nonblock = os::nonblock(fd);
- if (nonblock.isError()) {
- os::close(fd);
- return Failure(
- "Failed to make duplicated file descriptor non-blocking: " +
- nonblock.error());
- }
-
- return internal::_write(fd, Owned<string>(new string(data)), 0)
- .onAny(lambda::bind(&os::close, fd));
-}
-
-
-Future<Nothing> redirect(int from, Option<int> to, size_t chunk)
-{
- // Make sure we've got "valid" file descriptors.
- if (from < 0 || (to.isSome() && to.get() < 0)) {
- return Failure(strerror(EBADF));
- }
-
- if (to.isNone()) {
- // Open up /dev/null that we can splice into.
- Try<int> open = os::open("/dev/null", O_WRONLY);
-
- if (open.isError()) {
- return Failure("Failed to open /dev/null for writing: " + open.error());
- }
-
- to = open.get();
- } else {
- // Duplicate 'to' so that we're in control of its lifetime.
- int fd = dup(to.get());
- if (fd == -1) {
- return Failure(ErrnoError("Failed to duplicate 'to' file descriptor"));
- }
-
- to = fd;
- }
-
- CHECK_SOME(to);
-
- // Duplicate 'from' so that we're in control of its lifetime.
- from = dup(from);
- if (from == -1) {
- return Failure(ErrnoError("Failed to duplicate 'from' file descriptor"));
- }
-
- // Set the close-on-exec flag (no-op if already set).
- Try<Nothing> cloexec = os::cloexec(from);
- if (cloexec.isError()) {
- os::close(from);
- os::close(to.get());
- return Failure("Failed to set close-on-exec on 'from': " + cloexec.error());
- }
-
- cloexec = os::cloexec(to.get());
- if (cloexec.isError()) {
- os::close(from);
- os::close(to.get());
- return Failure("Failed to set close-on-exec on 'to': " + cloexec.error());
- }
-
- // Make the file descriptors non-blocking (no-op if already set).
- Try<Nothing> nonblock = os::nonblock(from);
- if (nonblock.isError()) {
- os::close(from);
- os::close(to.get());
- return Failure("Failed to make 'from' non-blocking: " + nonblock.error());
- }
-
- nonblock = os::nonblock(to.get());
- if (nonblock.isError()) {
- os::close(from);
- os::close(to.get());
- return Failure("Failed to make 'to' non-blocking: " + nonblock.error());
- }
-
- return internal::splice(from, to.get(), chunk)
- .onAny(lambda::bind(&os::close, from))
- .onAny(lambda::bind(&os::close, to.get()));
-}
-
-} // namespace io {
-
-
namespace inject {
bool exited(const UPID& from, const UPID& to)
[26/30] mesos git commit: Add bind(), listen(),
and accept() to Socket interface.
Posted by be...@apache.org.
Add bind(), listen(), and accept() to Socket interface.
Review: https://reviews.apache.org/r/27966
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8279b45e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8279b45e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8279b45e
Branch: refs/heads/master
Commit: 8279b45ed325375bbf8cc5919fd7009c7d2335cf
Parents: 8edab65
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Nov 15 16:55:07 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 17:38:22 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/include/process/socket.hpp | 21 ++
3rdparty/libprocess/src/libev.cpp | 1 -
3rdparty/libprocess/src/libev.hpp | 3 -
3rdparty/libprocess/src/process.cpp | 235 +++++++++++---------
4 files changed, 153 insertions(+), 107 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/8279b45e/3rdparty/libprocess/include/process/socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/socket.hpp b/3rdparty/libprocess/include/process/socket.hpp
index 5fd8d1b..c022924 100644
--- a/3rdparty/libprocess/include/process/socket.hpp
+++ b/3rdparty/libprocess/include/process/socket.hpp
@@ -82,6 +82,12 @@ public:
Future<size_t> sendfile(int fd, off_t offset, size_t size);
+ Try<Node> bind(const Node& node);
+
+ Try<Nothing> listen(int backlog);
+
+ Future<Socket> accept();
+
private:
const Impl& create() const
{
@@ -143,6 +149,21 @@ public:
return impl->sendfile(fd, offset, size);
}
+ Try<Node> bind(const Node& node)
+ {
+ return impl->bind(node);
+ }
+
+ Try<Nothing> listen(int backlog)
+ {
+ return impl->listen(backlog);
+ }
+
+ Future<Socket> accept()
+ {
+ return impl->accept();
+ }
+
private:
explicit Socket(std::shared_ptr<Impl>&& that) : impl(std::move(that)) {}
http://git-wip-us.apache.org/repos/asf/mesos/blob/8279b45e/3rdparty/libprocess/src/libev.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libev.cpp b/3rdparty/libprocess/src/libev.cpp
index efc89d8..6560050 100644
--- a/3rdparty/libprocess/src/libev.cpp
+++ b/3rdparty/libprocess/src/libev.cpp
@@ -12,7 +12,6 @@ namespace process {
// libev.hpp (since these need to live in the static data space).
struct ev_loop* loop = NULL;
ev_async async_watcher;
-ev_io server_watcher;
std::queue<ev_io*>* watchers = new std::queue<ev_io*>();
synchronizable(watchers);
std::queue<lambda::function<void(void)>>* functions =
http://git-wip-us.apache.org/repos/asf/mesos/blob/8279b45e/3rdparty/libprocess/src/libev.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libev.hpp b/3rdparty/libprocess/src/libev.hpp
index bac8b6a..04847e3 100644
--- a/3rdparty/libprocess/src/libev.hpp
+++ b/3rdparty/libprocess/src/libev.hpp
@@ -18,9 +18,6 @@ extern struct ev_loop* loop;
// with IO watchers and functions (via run_in_event_loop).
extern ev_async async_watcher;
-// Server watcher for accepting connections.
-extern ev_io server_watcher;
-
// Queue of I/O watchers to be asynchronously added to the event loop
// (protected by 'watchers' below).
// TODO(benh): Replace this queue with functions that we put in
http://git-wip-us.apache.org/repos/asf/mesos/blob/8279b45e/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 9f91020..5d3b947 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -267,7 +267,7 @@ public:
SocketManager();
~SocketManager();
- Socket accepted(int s);
+ void accepted(const Socket& socket);
void link(ProcessBase* process, const UPID& to);
@@ -433,8 +433,11 @@ const string Profiler::STOP_HELP = HELP(
// Unique id that can be assigned to each process.
static uint32_t __id__ = 0;
+// Server socket listen backlog.
+static const int LISTEN_BACKLOG = 500000;
+
// Local server socket.
-static int __s__ = -1;
+static Socket __s__;
// Local node.
static Node __node__;
@@ -640,66 +643,6 @@ void decode_read(
} // namespace internal {
-void accept(struct ev_loop* loop, ev_io* watcher, int revents)
-{
- CHECK_EQ(__s__, watcher->fd);
-
- sockaddr_in addr;
- socklen_t addrlen = sizeof(addr);
-
- int s = ::accept(__s__, (sockaddr*) &addr, &addrlen);
-
- if (s < 0) {
- return;
- }
-
- Try<Nothing> nonblock = os::nonblock(s);
- if (nonblock.isError()) {
- LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, nonblock: "
- << nonblock.error();
- os::close(s);
- return;
- }
-
- Try<Nothing> cloexec = os::cloexec(s);
- if (cloexec.isError()) {
- LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, cloexec: "
- << cloexec.error();
- os::close(s);
- return;
- }
-
- // Turn off Nagle (TCP_NODELAY) so pipelined requests don't wait.
- int on = 1;
- if (setsockopt(s, SOL_TCP, TCP_NODELAY, &on, sizeof(on)) < 0) {
- const char* error = strerror(errno);
- VLOG(1) << "Failed to turn off the Nagle algorithm: " << error;
- os::close(s);
- } else {
- // Inform the socket manager for proper bookkeeping.
- Socket socket = socket_manager->accepted(s);
-
- // Allocate a buffer to read into. This can be replaced later
- // when socket supports a read function that provides the
- // buffered data in the resulting callback.
- const size_t size = 80 * 1024;
- char* data = new char[size];
- memset(data, 0, size);
-
- DataDecoder* decoder = new DataDecoder(socket);
-
- socket.read(data, size)
- .onAny(lambda::bind(
- &internal::decode_read,
- lambda::_1,
- data,
- size,
- new Socket(socket),
- decoder));
- }
-}
-
-
void* serve(void* arg)
{
ev_loop(((struct ev_loop*) arg), 0);
@@ -765,6 +708,37 @@ void timedout(list<Timer>&& timers)
// }
+namespace internal {
+
+void on_accept(const Future<Socket>& socket)
+{
+ if (socket.isReady()) {
+ // Inform the socket manager for proper bookkeeping.
+ socket_manager->accepted(socket.get());
+
+ const size_t size = 80 * 1024;
+ char* data = new char[size];
+ memset(data, 0, size);
+
+ DataDecoder* decoder = new DataDecoder(socket.get());
+
+ socket.get().read(data, size)
+ .onAny(lambda::bind(
+ &internal::decode_read,
+ lambda::_1,
+ data,
+ size,
+ new Socket(socket.get()),
+ decoder));
+ }
+
+ __s__.accept()
+ .onAny(lambda::bind(&on_accept, lambda::_1));
+}
+
+} // namespace internal {
+
+
void initialize(const string& delegate)
{
// TODO(benh): Return an error if attempting to initialize again
@@ -866,21 +840,6 @@ void initialize(const string& delegate)
}
// Create a "server" socket for communicating with other nodes.
- if ((__s__ = ::socket(AF_INET, SOCK_STREAM, 0)) < 0) {
- PLOG(FATAL) << "Failed to initialize, socket";
- }
-
- // Make socket non-blocking.
- Try<Nothing> nonblock = os::nonblock(__s__);
- if (nonblock.isError()) {
- LOG(FATAL) << "Failed to initialize, nonblock: " << nonblock.error();
- }
-
- // Set FD_CLOEXEC flag.
- Try<Nothing> cloexec = os::cloexec(__s__);
- if (cloexec.isError()) {
- LOG(FATAL) << "Failed to initialize, cloexec: " << cloexec.error();
- }
// Allow address reuse.
int on = 1;
@@ -888,25 +847,12 @@ void initialize(const string& delegate)
PLOG(FATAL) << "Failed to initialize, setsockopt(SO_REUSEADDR)";
}
- // Set up socket.
- sockaddr_in addr;
- memset(&addr, 0, sizeof(addr));
- addr.sin_family = PF_INET;
- addr.sin_addr.s_addr = __node__.ip;
- addr.sin_port = htons(__node__.port);
-
- if (bind(__s__, (sockaddr*) &addr, sizeof(addr)) < 0) {
- PLOG(FATAL) << "Failed to initialize, bind " << __node__;
- }
-
- // Lookup and store assigned ip and assigned port.
- socklen_t addrlen = sizeof(addr);
- if (getsockname(__s__, (sockaddr*) &addr, &addrlen) < 0) {
- PLOG(FATAL) << "Failed to initialize, getsockname";
+ Try<Node> bind = __s__.bind(__node__);
+ if (bind.isError()) {
+ PLOG(FATAL) << "Failed to initialize: " << bind.error();
}
- __node__.ip = addr.sin_addr.s_addr;
- __node__.port = ntohs(addr.sin_port);
+ __node__ = bind.get();
// Lookup hostname if missing ip or if ip is 127.0.0.1 in case we
// actually have a valid external ip address. Note that we need only
@@ -931,8 +877,9 @@ void initialize(const string& delegate)
__node__.ip = *((uint32_t *) he->h_addr_list[0]);
}
- if (listen(__s__, 500000) < 0) {
- PLOG(FATAL) << "Failed to initialize, listen";
+ Try<Nothing> listen = __s__.listen(LISTEN_BACKLOG);
+ if (listen.isError()) {
+ PLOG(FATAL) << "Failed to initialize: " << listen.error();
}
// Initialize libev.
@@ -950,9 +897,6 @@ void initialize(const string& delegate)
ev_async_init(&async_watcher, handle_async);
ev_async_start(loop, &async_watcher);
- ev_io_init(&server_watcher, accept, __s__, EV_READ);
- ev_io_start(loop, &server_watcher);
-
Clock::initialize(lambda::bind(&timedout, lambda::_1));
// ev_child_init(&child_watcher, child_exited, pid, 0);
@@ -979,6 +923,9 @@ void initialize(const string& delegate)
// 'spawn' below for the garbage collector.
initializing = false;
+ __s__.accept()
+ .onAny(lambda::bind(&internal::on_accept, lambda::_1));
+
// TODO(benh): Make sure creating the garbage collector, logging
// process, and profiler always succeeds and use supervisors to make
// sure that none terminate.
@@ -1436,6 +1383,90 @@ Future<size_t> Socket::Impl::sendfile(int fd, off_t offset, size_t size)
}
+Try<Node> Socket::Impl::bind(const Node& node)
+{
+ sockaddr_in addr;
+ memset(&addr, 0, sizeof(addr));
+ addr.sin_family = PF_INET;
+ addr.sin_addr.s_addr = node.ip;
+ addr.sin_port = htons(node.port);
+
+ if (::bind(get(), (sockaddr*) &addr, sizeof(addr)) < 0) {
+ return Error("Failed to bind: " + string(inet_ntoa(addr.sin_addr)) +
+ ":" + stringify(node.port));
+ }
+
+ // Lookup and store assigned ip and assigned port.
+ socklen_t addrlen = sizeof(addr);
+ if (getsockname(get(), (sockaddr*) &addr, &addrlen) < 0) {
+ return ErrnoError("Failed to bind, getsockname");
+ }
+
+ return Node(addr.sin_addr.s_addr, ntohs(addr.sin_port));
+}
+
+
+Try<Nothing> Socket::Impl::listen(int backlog)
+{
+ if (::listen(get(), backlog) < 0) {
+ return ErrnoError();
+ }
+ return Nothing();
+}
+
+
+namespace internal {
+
+Future<Socket> accept(int fd)
+{
+ sockaddr_in addr;
+ socklen_t addrlen = sizeof(addr);
+
+ int s = ::accept(fd, (sockaddr*) &addr, &addrlen);
+
+ if (s < 0) {
+ return Failure(ErrnoError("Failed to accept"));
+ }
+
+ Try<Nothing> nonblock = os::nonblock(s);
+ if (nonblock.isError()) {
+ LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, nonblock: "
+ << nonblock.error();
+ os::close(s);
+ return Failure("Failed to accept, nonblock: " + nonblock.error());
+ }
+
+ Try<Nothing> cloexec = os::cloexec(s);
+ if (cloexec.isError()) {
+ LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, cloexec: "
+ << cloexec.error();
+ os::close(s);
+ return Failure("Failed to accept, cloexec: " + cloexec.error());
+ }
+
+ // Turn off Nagle (TCP_NODELAY) so pipelined requests don't wait.
+ int on = 1;
+ if (setsockopt(s, SOL_TCP, TCP_NODELAY, &on, sizeof(on)) < 0) {
+ const char* error = strerror(errno);
+ VLOG(1) << "Failed to turn off the Nagle algorithm: " << error;
+ os::close(s);
+ return Failure(
+ "Failed to turn off the Nagle algorithm: " + stringify(error));
+ }
+
+ return Socket(s);
+}
+
+} // namespace internal {
+
+
+Future<Socket> Socket::Impl::accept()
+{
+ return io::poll(get(), io::READ)
+ .then(lambda::bind(&internal::accept, get()));
+}
+
+
SocketManager::SocketManager()
{
synchronizer(this) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
@@ -1445,13 +1476,11 @@ SocketManager::SocketManager()
SocketManager::~SocketManager() {}
-Socket SocketManager::accepted(int s)
+void SocketManager::accepted(const Socket& socket)
{
synchronized (this) {
- return sockets[s] = Socket(s);
+ sockets[socket] = socket;
}
-
- UNREACHABLE();
}
[20/30] mesos git commit: Introduce std::make_shared configure check
in libprocess.
Posted by be...@apache.org.
Introduce std::make_shared configure check in libprocess.
Review: https://reviews.apache.org/r/27920
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b7f7c98b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b7f7c98b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b7f7c98b
Branch: refs/heads/master
Commit: b7f7c98bd04f2a1c62cc3c034572d779ccc0e8fe
Parents: fcdae27
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Nov 15 17:34:50 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 17:38:21 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/m4/ax_cxx_compile_stdcxx_11.m4 | 2 ++
1 file changed, 2 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/b7f7c98b/3rdparty/libprocess/m4/ax_cxx_compile_stdcxx_11.m4
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/m4/ax_cxx_compile_stdcxx_11.m4 b/3rdparty/libprocess/m4/ax_cxx_compile_stdcxx_11.m4
index eebb4bd..d48a96e 100644
--- a/3rdparty/libprocess/m4/ax_cxx_compile_stdcxx_11.m4
+++ b/3rdparty/libprocess/m4/ax_cxx_compile_stdcxx_11.m4
@@ -64,6 +64,8 @@ m4_define([_AX_CXX_COMPILE_STDCXX_11_testbody], [
std::unique_ptr<int> i(new int());
int j = foo(std::move(i));
+ std::shared_ptr<int> k = std::make_shared<int>(2);
+
void mutexTest()
{
std::mutex mutex;
[05/30] mesos git commit: Introduced a libev async watcher
specifically for updating timers.
Posted by be...@apache.org.
Introduced a libev async watcher specifically for updating timers.
Review: https://reviews.apache.org/r/27502
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/18cc45f1
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/18cc45f1
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/18cc45f1
Branch: refs/heads/master
Commit: 18cc45f1f0c27e1766be9c1227c24d63c14bbcb0
Parents: e1ef91f
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sun Nov 2 16:49:22 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 16:25:58 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/src/process.cpp | 22 +++++++++++++++++-----
1 file changed, 17 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/18cc45f1/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 2282e9b..a0b4ca0 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -449,9 +449,14 @@ static ProcessManager* process_manager = NULL;
// Event loop.
static struct ev_loop* loop = NULL;
-// Asynchronous watcher for interrupting loop.
+// Asynchronous watcher for interrupting loop to specifically deal
+// with IO watchers and functions (via run_in_event_loop).
static ev_async async_watcher;
+// Asynchronous watcher for interrupting loop to specifically deal
+// with updating timers.
+static ev_async async_update_timer_watcher;
+
// Watcher for timeouts.
static ev_timer timeouts_watcher;
@@ -609,7 +614,7 @@ void Clock::resume()
clock::settling = false;
clock::currents->clear();
update_timer = true;
- ev_async_send(loop, &async_watcher);
+ ev_async_send(loop, &async_update_timer_watcher);
}
}
}
@@ -624,7 +629,7 @@ void Clock::advance(const Duration& duration)
VLOG(2) << "Clock advanced (" << duration << ") to " << clock::current;
if (!update_timer) {
update_timer = true;
- ev_async_send(loop, &async_watcher);
+ ev_async_send(loop, &async_update_timer_watcher);
}
}
}
@@ -655,7 +660,7 @@ void Clock::update(const Time& time)
VLOG(2) << "Clock updated to " << clock::current;
if (!update_timer) {
update_timer = true;
- ev_async_send(loop, &async_watcher);
+ ev_async_send(loop, &async_update_timer_watcher);
}
}
}
@@ -866,7 +871,11 @@ void handle_async(struct ev_loop* loop, ev_async* _, int revents)
functions->pop();
}
}
+}
+
+void handle_async_update_timer(struct ev_loop* loop, ev_async* _, int revents)
+{
synchronized (timeouts) {
if (update_timer) {
if (!timeouts->empty()) {
@@ -1619,6 +1628,9 @@ void initialize(const string& delegate)
ev_async_init(&async_watcher, handle_async);
ev_async_start(loop, &async_watcher);
+ ev_async_init(&async_update_timer_watcher, handle_async_update_timer);
+ ev_async_start(loop, &async_update_timer_watcher);
+
ev_timer_init(&timeouts_watcher, handle_timeouts, 0., 2100000.0);
ev_timer_again(loop, &timeouts_watcher);
@@ -3226,7 +3238,7 @@ Timer Clock::timer(
// Need to interrupt the loop to update/set timer repeat.
(*timeouts)[timer.timeout().time()].push_back(timer);
update_timer = true;
- ev_async_send(loop, &async_watcher);
+ ev_async_send(loop, &async_update_timer_watcher);
} else {
// Timer repeat is adequate, just add the timeout.
CHECK(timeouts->size() >= 1);
[03/30] mesos git commit: Replaced Timer::create/cancel with
Clock::timer/cancel.
Posted by be...@apache.org.
Replaced Timer::create/cancel with Clock::timer/cancel.
Review: https://reviews.apache.org/r/27496
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/303ee1c3
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/303ee1c3
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/303ee1c3
Branch: refs/heads/master
Commit: 303ee1c3f23c0bb2cc6cfd58982848ea225ad2df
Parents: acd656c
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sat Nov 1 16:00:53 2014 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 16:25:57 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/include/process/c++11/delay.hpp | 5 +++--
3rdparty/libprocess/include/process/clock.hpp | 16 +++++++++++++++-
3rdparty/libprocess/include/process/delay.hpp | 5 +++--
3rdparty/libprocess/include/process/future.hpp | 5 +++--
3rdparty/libprocess/include/process/timer.hpp | 11 ++++-------
3rdparty/libprocess/src/process.cpp | 4 ++--
6 files changed, 30 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/303ee1c3/3rdparty/libprocess/include/process/c++11/delay.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/c++11/delay.hpp b/3rdparty/libprocess/include/process/c++11/delay.hpp
index 5f686db..5818e83 100644
--- a/3rdparty/libprocess/include/process/c++11/delay.hpp
+++ b/3rdparty/libprocess/include/process/c++11/delay.hpp
@@ -1,6 +1,7 @@
#ifndef __PROCESS_DELAY_HPP__
#define __PROCESS_DELAY_HPP__
+#include <process/clock.hpp>
#include <process/dispatch.hpp>
#include <process/timer.hpp>
@@ -19,7 +20,7 @@ Timer delay(const Duration& duration,
const PID<T>& pid,
void (T::*method)())
{
- return Timer::create(duration, [=] () {
+ return Clock::timer(duration, [=] () {
dispatch(pid, method);
});
}
@@ -52,7 +53,7 @@ Timer delay(const Duration& duration,
void (T::*method)(ENUM_PARAMS(N, P)), \
ENUM_BINARY_PARAMS(N, A, a)) \
{ \
- return Timer::create(duration, [=] () { \
+ return Clock::timer(duration, [=] () { \
dispatch(pid, method, ENUM_PARAMS(N, a)); \
}); \
} \
http://git-wip-us.apache.org/repos/asf/mesos/blob/303ee1c3/3rdparty/libprocess/include/process/clock.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/clock.hpp b/3rdparty/libprocess/include/process/clock.hpp
index eb157ca..80190ef 100644
--- a/3rdparty/libprocess/include/process/clock.hpp
+++ b/3rdparty/libprocess/include/process/clock.hpp
@@ -2,27 +2,41 @@
#define __PROCESS_CLOCK_HPP__
#include <process/time.hpp>
+#include <process/timer.hpp>
#include <stout/duration.hpp>
+#include <stout/lambda.hpp>
namespace process {
-// Forward declarations.
+// Forward declarations (to avoid circular dependencies).
class ProcessBase;
class Time;
+class Timer;
class Clock
{
public:
static Time now();
static Time now(ProcessBase* process);
+
+ static Timer timer(
+ const Duration& duration,
+ const lambda::function<void(void)>& thunk);
+
+ static bool cancel(const Timer& timer);
+
static void pause();
static bool paused();
+
static void resume();
+
static void advance(const Duration& duration);
static void advance(ProcessBase* process, const Duration& duration);
+
static void update(const Time& time);
static void update(ProcessBase* process, const Time& time);
+
static void order(ProcessBase* from, ProcessBase* to);
// When the clock is paused, settle() synchronously ensures that:
http://git-wip-us.apache.org/repos/asf/mesos/blob/303ee1c3/3rdparty/libprocess/include/process/delay.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/delay.hpp b/3rdparty/libprocess/include/process/delay.hpp
index 487f652..40627ea 100644
--- a/3rdparty/libprocess/include/process/delay.hpp
+++ b/3rdparty/libprocess/include/process/delay.hpp
@@ -4,6 +4,7 @@
#ifndef __PROCESS_DELAY_HPP__
#define __PROCESS_DELAY_HPP__
+#include <process/clock.hpp>
#include <process/dispatch.hpp>
#include <process/timer.hpp>
@@ -40,7 +41,7 @@ Timer delay(const Duration& duration,
dispatcher,
internal::canonicalize(method));
- return Timer::create(duration, dispatch);
+ return Clock::timer(duration, dispatch);
}
@@ -87,7 +88,7 @@ Timer delay(const Duration& duration,
dispatcher, \
internal::canonicalize(method)); \
\
- return Timer::create(duration, dispatch); \
+ return Clock::timer(duration, dispatch); \
} \
\
template <typename T, \
http://git-wip-us.apache.org/repos/asf/mesos/blob/303ee1c3/3rdparty/libprocess/include/process/future.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/future.hpp b/3rdparty/libprocess/include/process/future.hpp
index 3ac1812..68e5f7b 100644
--- a/3rdparty/libprocess/include/process/future.hpp
+++ b/3rdparty/libprocess/include/process/future.hpp
@@ -18,6 +18,7 @@
#include <boost/type_traits.hpp>
#endif // __cplusplus < 201103L
+#include <process/clock.hpp>
#include <process/internal.hpp>
#include <process/latch.hpp>
#include <process/owned.hpp>
@@ -1491,7 +1492,7 @@ void after(
{
CHECK(!future.isPending());
if (latch->trigger()) {
- Timer::cancel(timer);
+ Clock::cancel(timer);
promise->associate(future);
}
}
@@ -1554,7 +1555,7 @@ Future<T> Future<T>::after(
// completed. Note that we do not pass a weak reference for this
// future as we don't want the future to get cleaned up and then
// have the timer expire.
- Timer timer = Timer::create(
+ Timer timer = Clock::timer(
duration,
lambda::bind(&internal::expired<T>, f, latch, promise, *this));
http://git-wip-us.apache.org/repos/asf/mesos/blob/303ee1c3/3rdparty/libprocess/include/process/timer.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/timer.hpp b/3rdparty/libprocess/include/process/timer.hpp
index e2f5563..e5d71f6 100644
--- a/3rdparty/libprocess/include/process/timer.hpp
+++ b/3rdparty/libprocess/include/process/timer.hpp
@@ -11,19 +11,14 @@
namespace process {
-// Timer support!
+// Timer represents a delayed thunk, that can get created (scheduled)
+// and canceled using the Clock.
class Timer
{
public:
Timer() : id(0), pid(process::UPID()), thunk(&abort) {}
- static Timer create(
- const Duration& duration,
- const lambda::function<void(void)>& thunk);
-
- static bool cancel(const Timer& timer);
-
bool operator == (const Timer& that) const
{
return id == that.id;
@@ -50,6 +45,8 @@ public:
}
private:
+ friend class Clock;
+
Timer(long _id,
const Timeout& _t,
const process::UPID& _pid,
http://git-wip-us.apache.org/repos/asf/mesos/blob/303ee1c3/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 5842705..9ebac08 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -3160,7 +3160,7 @@ Future<Response> ProcessManager::__processes__(const Request&)
}
-Timer Timer::create(
+Timer Clock::timer(
const Duration& duration,
const lambda::function<void(void)>& thunk)
{
@@ -3194,7 +3194,7 @@ Timer Timer::create(
}
-bool Timer::cancel(const Timer& timer)
+bool Clock::cancel(const Timer& timer)
{
bool canceled = false;
synchronized (timeouts) {
[21/30] mesos git commit: Introduce std::make_shared configure check.
Posted by be...@apache.org.
Introduce std::make_shared configure check.
Review: https://reviews.apache.org/r/27921
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9eda4331
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9eda4331
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9eda4331
Branch: refs/heads/master
Commit: 9eda4331dd23c3646aba1ec710e0dd3190e579ab
Parents: b7f7c98
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Nov 15 17:35:49 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Nov 15 17:38:21 2014 -0800
----------------------------------------------------------------------
m4/ax_cxx_compile_stdcxx_11.m4 | 2 ++
1 file changed, 2 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/9eda4331/m4/ax_cxx_compile_stdcxx_11.m4
----------------------------------------------------------------------
diff --git a/m4/ax_cxx_compile_stdcxx_11.m4 b/m4/ax_cxx_compile_stdcxx_11.m4
index 07e20bb..6a859b8 100644
--- a/m4/ax_cxx_compile_stdcxx_11.m4
+++ b/m4/ax_cxx_compile_stdcxx_11.m4
@@ -80,6 +80,8 @@ m4_define([_AX_CXX_COMPILE_STDCXX_11_testbody], [
p1->bar();
}
+ std::shared_ptr<int> k = std::make_shared<int>(2);
+
void mutexTest()
{
std::mutex _mutex;