You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2007/06/18 14:11:34 UTC
svn commit: r548337 - in /incubator/qpid/trunk/qpid/cpp: ./ src/ src/qpid/
src/qpid/log/ src/qpid/sys/ src/qpid/sys/epoll/ src/qpid/sys/posix/
src/tests/
Author: astitcher
Date: Mon Jun 18 05:11:32 2007
New Revision: 548337
URL: http://svn.apache.org/viewvc?view=rev&rev=548337
Log:
Intermediate checkin with preliminary work on epoll based net IO
Added:
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/DispatcherTest.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/PollerTest.cpp
Modified:
incubator/qpid/trunk/qpid/cpp/configure.ac
incubator/qpid/trunk/qpid/cpp/src/Makefile.am
incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.h
incubator/qpid/trunk/qpid/cpp/src/qpid/QpidError.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/QpidError.h
incubator/qpid/trunk/qpid/cpp/src/qpid/log/Statement.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ScopedIncrement.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Time.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannel.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannel.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelAcceptor.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelConnection.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelConnection.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelThreads.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelThreads.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/check.h
Modified: incubator/qpid/trunk/qpid/cpp/configure.ac
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/configure.ac?view=diff&rev=548337&r1=548336&r2=548337
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/configure.ac (original)
+++ incubator/qpid/trunk/qpid/cpp/configure.ac Mon Jun 18 05:11:32 2007
@@ -121,7 +121,6 @@
PKG_CHECK_MODULES([APR], [apr-1 >= $APR_MINIMUM_VERSION])
APR_CXXFLAGS="$APR_CFLAGS"
if test "$enable_APR_NETIO" = yes; then
- APR_CXXFLAGS+=" -DUSE_APR_NETIO=1"
USE_APR_NETIO=1
fi
if test "$enable_APR_PLATFORM" = yes; then
Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?view=diff&rev=548337&r1=548336&r2=548337
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Mon Jun 18 05:11:32 2007
@@ -69,6 +69,7 @@
posix_plat_src = \
qpid/sys/posix/check.cpp \
+ qpid/sys/epoll/EpollPoller.cpp \
qpid/sys/posix/Socket.cpp \
qpid/sys/posix/Time.cpp \
qpid/sys/posix/Thread.cpp
@@ -155,7 +156,6 @@
gen/qpid/framing/AMQP_MethodVersionMap.cpp \
gen/qpid/framing/AMQP_ServerProxy.cpp \
qpid/Exception.cpp \
- qpid/ExceptionHolder.cpp \
qpid/Url.h \
qpid/Url.cpp \
qpid/QpidError.cpp \
@@ -345,6 +345,7 @@
qpid/framing/amqp_types_full.h \
qpid/sys/Acceptor.h \
qpid/sys/AtomicCount.h \
+ qpid/sys/Dispatcher.h \
qpid/sys/Condition.h \
qpid/sys/ConnectionInputHandler.h \
qpid/sys/ConnectionInputHandlerFactory.h \
@@ -352,6 +353,7 @@
qpid/sys/Module.h \
qpid/sys/Monitor.h \
qpid/sys/Mutex.h \
+ qpid/sys/Poller.h \
qpid/sys/ProducerConsumer.h \
qpid/sys/Runnable.h \
qpid/sys/ScopedIncrement.h \
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.cpp?view=diff&rev=548337&r1=548336&r2=548337
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.cpp Mon Jun 18 05:11:32 2007
@@ -42,18 +42,34 @@
Exception::Exception(const char* str) throw() : whatStr(str) { ctorLog(this); }
+Exception::Exception(const std::exception& e) throw() : whatStr(e.what()) {}
+
Exception::~Exception() throw() {}
const char* Exception::what() const throw() { return whatStr.c_str(); }
std::string Exception::toString() const throw() { return whatStr; }
-Exception* Exception::clone() const throw() { return new Exception(*this); }
+Exception::auto_ptr Exception::clone() const throw() { return Exception::auto_ptr(new Exception(*this)); }
void Exception::throwSelf() const { throw *this; }
ShutdownException::ShutdownException() : Exception("Shut down.") {}
EmptyException::EmptyException() : Exception("Empty.") {}
+
+const char* Exception::defaultMessage = "Unexpected exception";
+
+void Exception::log(const char* what, const char* message) {
+ QPID_LOG(error, message << ": " << what);
+}
+
+void Exception::log(const std::exception& e, const char* message) {
+ log(e.what(), message);
+}
+
+void Exception::logUnknown(const char* message) {
+ log("unknown exception.", message);
+}
} // namespace qpid
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.h?view=diff&rev=548337&r1=548336&r2=548337
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.h Mon Jun 18 05:11:32 2007
@@ -21,13 +21,15 @@
* under the License.
*
*/
+
+#include "framing/amqp_types.h"
+
#include <exception>
#include <string>
#include <memory>
#include <boost/shared_ptr.hpp>
#include <boost/lexical_cast.hpp>
-
-#include "framing/amqp_types.h"
+#include <boost/function.hpp>
namespace qpid
{
@@ -44,6 +46,10 @@
std::string whatStr;
public:
+ typedef boost::shared_ptr<Exception> shared_ptr;
+ typedef boost::shared_ptr<const Exception> shared_ptr_const;
+ typedef std::auto_ptr<Exception> auto_ptr;
+
Exception() throw();
Exception(const std::string& str) throw();
Exception(const char* str) throw();
@@ -59,10 +65,61 @@
virtual const char* what() const throw();
virtual std::string toString() const throw();
- virtual Exception* clone() const throw();
+ virtual auto_ptr clone() const throw();
virtual void throwSelf() const;
- typedef boost::shared_ptr<Exception> shared_ptr;
+ /** Default message: "Unknown exception" or something like it. */
+ static const char* defaultMessage;
+
+ /**
+ * Log a message of the form "message: what"
+ *@param what Exception's what() message.
+ *@param message Prefix message.
+ */
+ static void log(const char* what, const char* message = defaultMessage);
+
+ /**
+ * Log an exception.
+ *@param e Exception to log.
+
+ */
+ static void log(
+ const std::exception& e, const char* message = defaultMessage);
+
+
+ /**
+ * Log an unknown exception - use in catch(...)
+ *@param message Prefix message.
+ */
+ static void logUnknown(const char* message = defaultMessage);
+
+ /**
+ * Wrapper template function to call another function inside
+ * try/catch and log any exception. Use boost::bind to wrap
+ * member function calls or functions with arguments.
+ *
+ *@param f Function to call in try block.
+ *@param retrhow If true the exception is rethrown.
+ *@param message Prefix message.
+ */
+ template <class T>
+ static T tryCatchLog(boost::function0<T> f, bool rethrow=true,
+ const char* message=defaultMessage)
+ {
+ try {
+ return f();
+ }
+ catch (const std::exception& e) {
+ log(e, message);
+ if (rethrow)
+ throw;
+ }
+ catch (...) {
+ logUnknown(message);
+ if (rethrow)
+ throw;
+ }
+ }
};
struct ChannelException : public Exception {
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/QpidError.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/QpidError.cpp?view=diff&rev=548337&r1=548336&r2=548337
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/QpidError.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/QpidError.cpp Mon Jun 18 05:11:32 2007
@@ -30,7 +30,7 @@
QpidError::~QpidError() throw() {}
-Exception* QpidError::clone() const throw() { return new QpidError(*this); }
+Exception::auto_ptr QpidError::clone() const throw() { return Exception::auto_ptr(new QpidError(*this)); }
void QpidError::throwSelf() const { throw *this; }
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/QpidError.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/QpidError.h?view=diff&rev=548337&r1=548336&r2=548337
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/QpidError.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/QpidError.h Mon Jun 18 05:11:32 2007
@@ -52,7 +52,7 @@
{ init(); }
~QpidError() throw();
- Exception* clone() const throw();
+ Exception::auto_ptr clone() const throw();
void throwSelf() const;
private:
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/log/Statement.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/log/Statement.h?view=diff&rev=548337&r1=548336&r2=548337
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/log/Statement.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/log/Statement.h Mon Jun 18 05:11:32 2007
@@ -19,7 +19,6 @@
*
*/
-#include "qpid/log/Statement.h"
#include <boost/current_function.hpp>
#include <sstream>
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp?view=auto&rev=548337
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp Mon Jun 18 05:11:32 2007
@@ -0,0 +1,90 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Dispatcher.h"
+
+#include <assert.h>
+
+namespace qpid {
+namespace sys {
+
+Dispatcher::Dispatcher(Poller::shared_ptr poller0) :
+ poller(poller0) {
+}
+
+Dispatcher::~Dispatcher() {
+}
+
+void Dispatcher::run() {
+ do {
+ Poller::Event event = poller->wait();
+ // Poller::wait guarantees to return an event
+ DispatchHandle* h = static_cast<DispatchHandle*>(event.handle);
+ switch (event.dir) {
+ case Poller::IN:
+ h->readableCallback(*h);
+ break;
+ case Poller::OUT:
+ h->writableCallback(*h);
+ break;
+ case Poller::INOUT:
+ h->readableCallback(*h);
+ h->writableCallback(*h);
+ break;
+ case Poller::SHUTDOWN:
+ goto dispatcher_shutdown;
+ default:
+ ;
+ }
+ } while (true);
+
+dispatcher_shutdown:
+ ;
+}
+
+void DispatchHandle::watch(Poller::shared_ptr poller0) {
+ bool r = readableCallback;
+ bool w = writableCallback;
+
+ // If no callbacks set then do nothing (that is what we were asked to do!)
+ // TODO: Maybe this should be an assert instead
+ if (!r && !w)
+ return;
+
+ Poller::Direction d = r ?
+ (w ? Poller::INOUT : Poller::IN) :
+ Poller::OUT;
+
+ poller = poller0;
+ poller->addFd(*this, d);
+}
+
+void DispatchHandle::rewatch() {
+ assert(poller);
+ poller->rearmFd(*this);
+}
+
+void DispatchHandle::unwatch() {
+ poller->delFd(*this);
+ poller.reset();
+}
+
+}}
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.h?view=auto&rev=548337
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.h Mon Jun 18 05:11:32 2007
@@ -0,0 +1,74 @@
+#ifndef _sys_Dispatcher_h
+#define _sys_Dispatcher_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+#include "Poller.h"
+#include "Runnable.h"
+
+#include <memory>
+#include <boost/function.hpp>
+
+#include <assert.h>
+
+
+namespace qpid {
+namespace sys {
+
+class Dispatcher;
+class DispatchHandle : public PollerHandle {
+ friend class Dispatcher;
+public:
+ typedef boost::function1<void, DispatchHandle&> Callback;
+
+private:
+ Callback readableCallback;
+ Callback writableCallback;
+ Poller::shared_ptr poller;
+
+public:
+
+ DispatchHandle(int fd, Callback rCb, Callback wCb) :
+ PollerHandle(fd),
+ readableCallback(rCb),
+ writableCallback(wCb)
+ {}
+
+ void watch(Poller::shared_ptr poller);
+ void rewatch();
+ void unwatch();
+};
+
+class Dispatcher : public Runnable {
+ const Poller::shared_ptr poller;
+
+public:
+ Dispatcher(Poller::shared_ptr poller);
+ ~Dispatcher();
+
+ void run();
+};
+
+}}
+
+#endif // _sys_Dispatcher_h
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h?view=auto&rev=548337
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h Mon Jun 18 05:11:32 2007
@@ -0,0 +1,93 @@
+#ifndef _sys_Poller_h
+#define _sys_Poller_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Time.h"
+
+#include <stdint.h>
+
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+namespace sys {
+
+/**
+ * Handle class to use for polling
+ */
+class Poller;
+class PollerHandlePrivate;
+class PollerHandle {
+ friend class Poller;
+
+ PollerHandlePrivate* impl;
+ const int fd;
+
+public:
+ PollerHandle(int fd0);
+ virtual ~PollerHandle();
+
+ int getFD() const { return fd; }
+};
+
+/**
+ * Poller: abstract class to encapsulate a file descriptor poll to be used
+ * by a reactor
+ */
+class PollerPrivate;
+class Poller {
+ PollerPrivate* impl;
+
+public:
+ typedef boost::shared_ptr<Poller> shared_ptr;
+
+ enum Direction {
+ NONE,
+ IN,
+ OUT,
+ INOUT,
+ SHUTDOWN
+ };
+
+ struct Event {
+ PollerHandle* handle;
+ Direction dir;
+
+ Event(PollerHandle* handle0, Direction dir0) :
+ handle(handle0),
+ dir(dir0) {
+ }
+ };
+
+ Poller();
+ ~Poller();
+ void shutdown();
+
+ void addFd(PollerHandle& handle, Direction dir);
+ void delFd(PollerHandle& handle);
+ void modFd(PollerHandle& handle, Direction dir);
+ void rearmFd(PollerHandle& handle);
+ Event wait(Duration timeout = TIME_INFINITE);
+};
+
+}}
+#endif // _sys_Poller_h
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ScopedIncrement.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ScopedIncrement.h?view=diff&rev=548337&r1=548336&r2=548337
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ScopedIncrement.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ScopedIncrement.h Mon Jun 18 05:11:32 2007
@@ -20,19 +20,27 @@
*/
#include <boost/noncopyable.hpp>
+#include <boost/function.hpp>
namespace qpid {
namespace sys {
-/** Increment counter in constructor and decrement in destructor. */
+/**
+ * Increment counter in constructor and decrement in destructor.
+ * Optionally call a function if the decremented counter value is 0.
+ * Note the function must not throw, it is called in the destructor.
+ */
template <class T>
class ScopedIncrement : boost::noncopyable
{
public:
- ScopedIncrement(T& c) : count(c) { ++count; }
- ~ScopedIncrement() { --count; }
+ ScopedIncrement(T& c, boost::function0<void> f=0)
+ : count(c), callback(f) { ++count; }
+ ~ScopedIncrement() { if (--count == 0 && callback) callback(); }
+
private:
T& count;
+ boost::function0<void> callback;
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Time.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Time.h?view=diff&rev=548337&r1=548336&r2=548337
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Time.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Time.h Mon Jun 18 05:11:32 2007
@@ -97,8 +97,12 @@
/** Nanoseconds per nanosecond. */
const Duration TIME_NSEC = 1;
+/** Value to represent an infinite timeout */
+const Duration TIME_INFINITE = std::numeric_limits<int64_t>::max();
+
/** Time greater than any other time */
const AbsTime FAR_FUTURE = AbsTime::FarFuture();
+
}}
#endif /*!_sys_Time_h*/
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp?view=auto&rev=548337
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp Mon Jun 18 05:11:32 2007
@@ -0,0 +1,263 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Poller.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/posix/check.h"
+
+#include <sys/epoll.h>
+#include <errno.h>
+
+#include <assert.h>
+#include <vector>
+#include <exception>
+
+namespace qpid {
+namespace sys {
+
+class PollerHandlePrivate {
+ friend class Poller;
+ friend class PollerHandle;
+
+ enum FDStat {
+ ABSENT,
+ MONITORED,
+ INACTIVE
+ };
+
+ ::__uint32_t events;
+ FDStat stat;
+ Mutex lock;
+
+ PollerHandlePrivate() :
+ events(0),
+ stat(ABSENT) {
+ }
+};
+
+PollerHandle::PollerHandle(int fd0) :
+ impl(new PollerHandlePrivate),
+ fd(fd0)
+{}
+
+PollerHandle::~PollerHandle() {
+ delete impl;
+}
+
+/**
+ * Concrete implementation of Poller to use the Linux specific epoll
+ * interface
+ */
+class PollerPrivate {
+ friend class Poller;
+
+ static const int DefaultFds = 256;
+
+ struct ReadablePipe {
+ int fds[2];
+
+ /**
+ * This encapsulates an always readable pipe which we can add
+ * to the epoll set to force epoll_wait to return
+ */
+ ReadablePipe() {
+ QPID_POSIX_CHECK(::pipe(fds));
+ // Just write the pipe's fds to the pipe
+ QPID_POSIX_CHECK(::write(fds[1], fds, 2));
+ }
+
+ ~ReadablePipe() {
+ ::close(fds[0]);
+ ::close(fds[1]);
+ }
+
+ int getFD() {
+ return fds[0];
+ }
+ };
+
+ static ReadablePipe alwaysReadable;
+
+ const int epollFd;
+ bool isShutdown;
+
+ static ::__uint32_t directionToEpollEvent(Poller::Direction dir) {
+ switch (dir) {
+ case Poller::IN: return ::EPOLLIN;
+ case Poller::OUT: return ::EPOLLOUT;
+ case Poller::INOUT: return ::EPOLLIN | ::EPOLLOUT;
+ default: return 0;
+ }
+ }
+
+ static Poller::Direction epollToDirection(::__uint32_t events) {
+ ::__uint32_t e = events & (::EPOLLIN | ::EPOLLOUT);
+ switch (e) {
+ case ::EPOLLIN: return Poller::IN;
+ case ::EPOLLOUT: return Poller::OUT;
+ case ::EPOLLIN | ::EPOLLOUT: return Poller::INOUT;
+ default: return Poller::NONE;
+ }
+ }
+
+ PollerPrivate() :
+ epollFd(::epoll_create(DefaultFds)),
+ isShutdown(false) {
+ QPID_POSIX_CHECK(epollFd);
+ }
+
+ ~PollerPrivate() {
+ // It's probably okay to ignore any errors here as there can't be data loss
+ ::close(epollFd);
+ }
+};
+
+PollerPrivate::ReadablePipe PollerPrivate::alwaysReadable;
+
+void Poller::addFd(PollerHandle& handle, Direction dir) {
+ PollerHandlePrivate& eh = *handle.impl;
+ ScopedLock<Mutex> l(eh.lock);
+ ::epoll_event epe;
+ int op;
+
+ if (eh.stat == PollerHandlePrivate::ABSENT) {
+ op = EPOLL_CTL_ADD;
+ epe.events = PollerPrivate::directionToEpollEvent(dir) | ::EPOLLONESHOT;
+ } else {
+ assert(eh.stat == PollerHandlePrivate::MONITORED);
+ op = EPOLL_CTL_MOD;
+ epe.events = eh.events | PollerPrivate::directionToEpollEvent(dir);
+ }
+ epe.data.ptr = &handle;
+
+ QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, op, handle.getFD(), &epe));
+
+ // Record monitoring state of this fd
+ eh.events = epe.events;
+ eh.stat = PollerHandlePrivate::MONITORED;
+}
+
+void Poller::delFd(PollerHandle& handle) {
+ PollerHandlePrivate& eh = *handle.impl;
+ ScopedLock<Mutex> l(eh.lock);
+ assert(eh.stat != PollerHandlePrivate::ABSENT);
+ QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, handle.getFD(), 0));
+ eh.stat = PollerHandlePrivate::ABSENT;
+}
+
+// modFd is equivalent to delFd followed by addFd
+void Poller::modFd(PollerHandle& handle, Direction dir) {
+ PollerHandlePrivate& eh = *handle.impl;
+ ScopedLock<Mutex> l(eh.lock);
+ assert(eh.stat != PollerHandlePrivate::ABSENT);
+
+ ::epoll_event epe;
+ epe.events = PollerPrivate::directionToEpollEvent(dir) | ::EPOLLONESHOT;
+ epe.data.ptr = &handle;
+
+ QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, handle.getFD(), &epe));
+
+ // Record monitoring state of this fd
+ eh.events = epe.events;
+ eh.stat = PollerHandlePrivate::MONITORED;
+}
+
+void Poller::rearmFd(PollerHandle& handle) {
+ PollerHandlePrivate& eh = *handle.impl;
+ ScopedLock<Mutex> l(eh.lock);
+ assert(eh.stat == PollerHandlePrivate::INACTIVE);
+
+ ::epoll_event epe;
+ epe.events = eh.events;
+ epe.data.ptr = &handle;
+
+ QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, handle.getFD(), &epe));
+
+ eh.stat = PollerHandlePrivate::MONITORED;
+}
+
+void Poller::shutdown() {
+ // Don't use any locking here - isshutdown will be visible to all
+ // after the epoll_ctl() anyway (it's a memory barrier)
+ impl->isShutdown = true;
+
+ // Add always readable fd to epoll (not EPOLLONESHOT)
+ int fd = impl->alwaysReadable.getFD();
+ ::epoll_event epe;
+ epe.events = ::EPOLLIN;
+ epe.data.ptr = 0;
+ QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_ADD, fd, &epe));
+}
+
+Poller::Event Poller::wait(Duration timeout) {
+ epoll_event epe;
+ int timeoutMs = (timeout == TIME_INFINITE) ? -1 : timeout / TIME_MSEC;
+
+ // Repeat until we weren't interupted
+ do {
+ int rc = ::epoll_wait(impl->epollFd, &epe, 1, timeoutMs);
+
+ if (impl->isShutdown) {
+ return Event(0, SHUTDOWN);
+ }
+
+ if (rc ==-1 && errno != EINTR) {
+ QPID_POSIX_CHECK(rc);
+ } else if (rc > 0) {
+ assert(rc == 1);
+ PollerHandle* handle = static_cast<PollerHandle*>(epe.data.ptr);
+ PollerHandlePrivate& eh = *handle->impl;
+
+ ScopedLock<Mutex> l(eh.lock);
+
+ // the handle could have gone inactive since we left the epoll_wait
+ if (eh.stat == PollerHandlePrivate::MONITORED) {
+ eh.stat = PollerHandlePrivate::INACTIVE;
+ return Event(handle, PollerPrivate::epollToDirection(epe.events));
+ }
+ }
+ // We only get here if one of the following:
+ // * epoll_wait was interrupted by a signal
+ // * epoll_wait timed out
+ // * the state of the handle changed after being returned by epoll_wait
+ //
+ // The only things we can do here are return a timeout or wait more.
+ // Obviously if we timed out we return timeout; if the wait was meant to
+ // be indefinite then we should never return with a time out so we go again.
+ // If the wait wasn't indefinite, but we were interrupted then we have to return
+ // with a timeout as we don't know how long we've waited so far and so we can't
+ // continue the wait.
+ if (rc == 0 || timeoutMs == -1) {
+ return Event(0, NONE);
+ }
+ } while (true);
+}
+
+// Concrete constructors
+Poller::Poller() :
+ impl(new PollerPrivate())
+{}
+
+Poller::~Poller() {
+ delete impl;
+}
+
+}}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannel.cpp?view=diff&rev=548337&r1=548336&r2=548337
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannel.cpp Mon Jun 18 05:11:32 2007
@@ -1,4 +1,4 @@
-/*
+/*
*
* Copyright (c) 2006 The Apache Software Foundation
*
@@ -16,6 +16,19 @@
*
*/
+// TODO aconway 2006-12-15: Locking review.
+
+// TODO aconway 2006-12-15: use Descriptor pointers everywhere,
+// get them from channel, pass them to Event constructors.
+// Eliminate lookup.
+
+
+#include "EventChannel.h"
+#include "check.h"
+
+#include "qpid/QpidError.h"
+#include "qpid/sys/AtomicCount.h"
+
#include <mqueue.h>
#include <string.h>
#include <iostream>
@@ -29,139 +42,420 @@
#include <queue>
#include <boost/ptr_container/ptr_map.hpp>
-#include <boost/current_function.hpp>
-
-#include "qpid/QpidError.h"
-#include "qpid/sys/Monitor.h"
-#include "qpid/log/Statement.h"
-
-#include "check.h"
-#include "EventChannel.h"
+#include <boost/noncopyable.hpp>
+#include <boost/bind.hpp>
using namespace std;
-// Convenience template to zero out a struct.
-template <class S> struct ZeroStruct : public S {
- ZeroStruct() { memset(this, 0, sizeof(*this)); }
-};
-
namespace qpid {
namespace sys {
+// ================================================================
+// Private class declarations
+
+namespace {
+
+typedef enum { IN, OUT } Direction;
+
+typedef std::pair<Event*, Event*> EventPair;
+
/**
- * EventHandler wraps an epoll file descriptor. Acts as private
- * interface between EventChannel and subclasses.
- *
- * Also implements Event interface for events that are not associated
- * with a file descriptor and are passed via the message queue.
- */
-class EventHandler : public Event, private Monitor
+ * Template to zero out a C-struct on construction. Avoids uninitialized memory
+ * warnings from valgrind or other mem checking tool.
+ */
+template <class T> struct CleanStruct : public T {
+ CleanStruct() { memset(this, 0, sizeof(*this)); }
+};
+
+} // namespace
+
+/**
+ * Queue of events corresponding to one IO direction (IN or OUT).
+ * Each Descriptor contains two Queues.
+ */
+class EventChannel::Queue : private boost::noncopyable
{
public:
- EventHandler(int epollSize = 256);
- ~EventHandler();
+ Queue(Descriptor& container, Direction dir);
- int getEpollFd() { return epollFd; }
- void epollAdd(int fd, uint32_t epollEvents, Event* event);
- void epollMod(int fd, uint32_t epollEvents, Event* event);
- void epollDel(int fd);
+ /** Called by Event classes in prepare() */
+ void push(Event* e);
- void mqPut(Event* event);
- Event* mqGet();
-
- protected:
- // Should never be called, only complete.
- void prepare(EventHandler&) { assert(0); }
- Event* complete(EventHandler& eh);
+ /** Called when epoll wakes.
+ *@return The next completed event or 0.
+ */
+ Event* wake(uint32_t epollFlags);
+
+ Event* pop() { Event* e = queue.front(); queue.pop_front(); return e; }
+
+ bool empty() { return queue.empty(); }
+
+ void setBit(uint32_t &epollFlags);
+
+ void shutdown();
private:
+ typedef std::deque<Event*> EventQ;
+
+ inline bool isMyEvent(uint32_t flags) { return flags | myEvent; }
+
+ Mutex& lock; // Shared with Descriptor.
+ Descriptor& descriptor;
+ uint32_t myEvent; // Epoll event flag.
+ EventQ queue;
+};
+
+
+/**
+ * Manages a file descriptor in an epoll set.
+ *
+ * Can be shutdown and re-activated for the same file descriptor.
+ */
+class EventChannel::Descriptor : private boost::noncopyable {
+ public:
+ explicit Descriptor(int fd) : epollFd(-1), myFd(fd),
+ inQueue(*this, IN), outQueue(*this, OUT) {}
+
+ void activate(int epollFd_);
+
+ /** Epoll woke up for this descriptor. */
+ Event* wake(uint32_t epollEvents);
+
+ /** Shut down: close and remove file descriptor.
+ * May be re-activated if fd is reused.
+ */
+ void shutdown();
+
+ // TODO aconway 2006-12-18: Nasty. Need to clean up interaction.
+ void shutdownUnsafe();
+
+ bool isShutdown() { return epollFd == -1; }
+
+ Queue& getQueue(Direction d) { return d==IN ? inQueue : outQueue; }
+ int getFD() const { return myFd; }
+
+ private:
+ void update();
+ void epollCtl(int op, uint32_t events);
+ Queue* pick();
+
+ Mutex lock;
int epollFd;
- std::string mqName;
- int mqFd;
- std::queue<Event*> mqEvents;
+ int myFd;
+ Queue inQueue, outQueue;
+ bool preferIn;
+
+ friend class Queue;
};
-EventHandler::EventHandler(int epollSize)
-{
- epollFd = epoll_create(epollSize);
- if (epollFd < 0) throw QPID_POSIX_ERROR(errno);
+
+/**
+ * Holds a map of Descriptors, which do most of the work.
+ */
+class EventChannel::Impl {
+ public:
+ Impl(int size = 256);
- // Create a POSIX message queue for non-fd events.
- // We write one byte and never read it is always ready for read
- // when we add it to epoll.
- //
- ZeroStruct<struct mq_attr> attr;
- attr.mq_maxmsg = 1;
- attr.mq_msgsize = 1;
- do {
- char tmpnam[L_tmpnam];
- tmpnam_r(tmpnam);
- mqName = tmpnam + 4; // Skip "tmp/"
- mqFd = mq_open(
- mqName.c_str(), O_CREAT|O_EXCL|O_RDWR|O_NONBLOCK, S_IRWXU, &attr);
- if (mqFd < 0) throw QPID_POSIX_ERROR(errno);
- } while (mqFd == EEXIST); // Name already taken, try again.
+ ~Impl();
- static char zero = '\0';
- mq_send(mqFd, &zero, 1, 0);
- epollAdd(mqFd, 0, this);
+ /**
+ * Activate descriptor
+ */
+ void activate(Descriptor& d) {
+ d.activate(epollFd);
+ }
+
+ /** Wait for an event, return 0 on timeout */
+ Event* wait(Duration timeout);
+
+ void shutdown();
+
+ private:
+
+ Monitor monitor;
+ int epollFd;
+ int shutdownPipe[2];
+ AtomicCount nWaiters;
+ bool isShutdown;
+};
+
+
+// ================================================================
+// EventChannel::Queue::implementation.
+
+static const char* shutdownMsg = "Event queue shut down.";
+
+EventChannel::Queue::Queue(Descriptor& d, Direction dir) :
+ lock(d.lock), descriptor(d),
+ myEvent(dir==IN ? EPOLLIN : EPOLLOUT)
+{}
+
+void EventChannel::Queue::push(Event* e) {
+ Mutex::ScopedLock l(lock);
+ if (descriptor.isShutdown())
+ THROW_QPID_ERROR(INTERNAL_ERROR, shutdownMsg);
+ queue.push_back(e);
+ descriptor.update();
+}
+
+void EventChannel::Queue::setBit(uint32_t &epollFlags) {
+ if (queue.empty())
+ epollFlags &= ~myEvent;
+ else
+ epollFlags |= myEvent;
+}
+
+// TODO aconway 2006-12-20: REMOVE
+Event* EventChannel::Queue::wake(uint32_t epollFlags) {
+ // Called with lock held.
+ if (!queue.empty() && (isMyEvent(epollFlags))) {
+ assert(!queue.empty());
+ Event* e = queue.front();
+ assert(e);
+ if (!e->getException()) {
+ // TODO aconway 2006-12-20: Can/should we move event completion
+ // out into dispatch() so it doesn't happen in Descriptor locks?
+ e->complete(descriptor);
+ }
+ queue.pop_front();
+ return e;
+ }
+ return 0;
+}
+
+void EventChannel::Queue::shutdown() {
+ // Mark all pending events with a shutdown exception.
+ // The server threads will remove and dispatch the events.
+ //
+ qpid::QpidError ex(INTERNAL_ERROR, shutdownMsg, SRCLINE);
+ for_each(queue.begin(), queue.end(),
+ boost::bind(&Event::setException, _1, ex));
}
-EventHandler::~EventHandler() {
- mq_close(mqFd);
- mq_unlink(mqName.c_str());
+
+// ================================================================
+// Descriptor
+
+
+void EventChannel::Descriptor::activate(int epollFd_) {
+ Mutex::ScopedLock l(lock);
+ if (isShutdown()) {
+ epollFd = epollFd_; // We're back in business.
+ epollCtl(EPOLL_CTL_ADD, 0);
+ }
}
-void EventHandler::mqPut(Event* event) {
- ScopedLock l(*this);
- assert(event != 0);
- mqEvents.push(event);
- epollMod(mqFd, EPOLLIN|EPOLLONESHOT, this);
+void EventChannel::Descriptor::shutdown() {
+ Mutex::ScopedLock l(lock);
+ shutdownUnsafe();
+}
+
+void EventChannel::Descriptor::shutdownUnsafe() {
+ // Caller holds lock.
+ ::close(myFd);
+ epollFd = -1; // Mark myself as shutdown.
+ inQueue.shutdown();
+ outQueue.shutdown();
+}
+
+// TODO aconway 2006-12-20: Inline into wake().
+void EventChannel::Descriptor::update() {
+ // Caller holds lock.
+ if (isShutdown()) // Nothing to do
+ return;
+ uint32_t events = EPOLLONESHOT | EPOLLERR | EPOLLHUP;
+ inQueue.setBit(events);
+ outQueue.setBit(events);
+ epollCtl(EPOLL_CTL_MOD, events);
}
+
+void EventChannel::Descriptor::epollCtl(int op, uint32_t events) {
+ // Caller holds lock
+ assert(!isShutdown());
+ CleanStruct<epoll_event> ee;
+ ee.data.ptr = this;
+ ee.events = events;
+ int status = ::epoll_ctl(epollFd, op, myFd, &ee);
+ if (status < 0) {
+ if (errno == EEXIST) // It's okay to add an existing fd
+ return;
+ else if (errno == EBADF) // FD was closed externally.
+ shutdownUnsafe();
+ else
+ throw QPID_POSIX_ERROR(errno);
+ }
+}
+
-Event* EventHandler::mqGet() {
- ScopedLock l(*this);
- if (mqEvents.empty())
+EventChannel::Queue* EventChannel::Descriptor::pick() {
+ if (inQueue.empty() && outQueue.empty())
return 0;
- Event* event = mqEvents.front();
- mqEvents.pop();
- if(!mqEvents.empty())
- epollMod(mqFd, EPOLLIN|EPOLLONESHOT, this);
- return event;
+ if (inQueue.empty() || outQueue.empty())
+ return !inQueue.empty() ? &inQueue : &outQueue;
+ // Neither is empty, pick fairly.
+ preferIn = !preferIn;
+ return preferIn ? &inQueue : &outQueue;
+}
+
+Event* EventChannel::Descriptor::wake(uint32_t epollEvents) {
+ Mutex::ScopedLock l(lock);
+ // On error, shut down the Descriptor and both queues.
+ if (epollEvents & (EPOLLERR | EPOLLHUP)) {
+ shutdownUnsafe();
+ // TODO aconway 2006-12-20: This error handling models means
+ // that any error reported by epoll will result in a shutdown
+ // exception on the events. Can we get more accurate error
+ // reporting somehow?
+ }
+ Queue*q = 0;
+ bool in = (epollEvents & EPOLLIN);
+ bool out = (epollEvents & EPOLLOUT);
+ if ((in && out) || isShutdown())
+ q = pick(); // Choose fairly, either non-empty queue.
+ else if (in)
+ q = &inQueue;
+ else if (out)
+ q = &outQueue;
+ Event* e = (q && !q->empty()) ? q->pop() : 0;
+ update();
+ if (e)
+ e->complete(*this);
+ return e;
}
-void EventHandler::epollAdd(int fd, uint32_t epollEvents, Event* event)
+
+
+// ================================================================
+// EventChannel::Impl
+
+
+EventChannel::Impl::Impl(int epollSize):
+ epollFd(-1), isShutdown(false)
{
- ZeroStruct<struct epoll_event> ee;
- ee.data.ptr = event;
- ee.events = epollEvents;
- if (epoll_ctl(epollFd, EPOLL_CTL_ADD, fd, &ee) < 0)
- throw QPID_POSIX_ERROR(errno);
+ // Create the epoll file descriptor.
+ epollFd = epoll_create(epollSize);
+ QPID_POSIX_CHECK(epollFd);
+
+ // Create a pipe and write a single byte. The byte is never
+ // read so the pipes read fd is always ready for read.
+ // We activate the FD when there are messages in the queue.
+ QPID_POSIX_CHECK(::pipe(shutdownPipe));
+ static char zero = '\0';
+ QPID_POSIX_CHECK(::write(shutdownPipe[1], &zero, 1));
}
-void EventHandler::epollMod(int fd, uint32_t epollEvents, Event* event)
-{
- ZeroStruct<struct epoll_event> ee;
- ee.data.ptr = event;
- ee.events = epollEvents;
- if (epoll_ctl(epollFd, EPOLL_CTL_MOD, fd, &ee) < 0)
- throw QPID_POSIX_ERROR(errno);
+EventChannel::Impl::~Impl() {
+ shutdown();
+ ::close(epollFd);
+ ::close(shutdownPipe[0]);
+ ::close(shutdownPipe[1]);
}
-void EventHandler::epollDel(int fd) {
- if (epoll_ctl(epollFd, EPOLL_CTL_DEL, fd, 0) < 0)
- throw QPID_POSIX_ERROR(errno);
+
+void EventChannel::Impl::shutdown() {
+ Monitor::ScopedLock l(monitor);
+ if (!isShutdown) { // I'm starting shutdown.
+ isShutdown = true;
+ if (nWaiters == 0)
+ return;
+
+ // TODO aconway 2006-12-20: If I just close the epollFd will
+ // that wake all threads? If so with what? Would be simpler than:
+
+ CleanStruct<epoll_event> ee;
+ ee.data.ptr = 0;
+ ee.events = EPOLLIN;
+ QPID_POSIX_CHECK(
+ epoll_ctl(epollFd, EPOLL_CTL_ADD, shutdownPipe[0], &ee));
+ }
+ // Wait for nWaiters to get out.
+ while (nWaiters > 0) {
+ monitor.wait();
+ }
}
-Event* EventHandler::complete(EventHandler& eh)
-{
- assert(&eh == this);
- Event* event = mqGet();
- return event==0 ? 0 : event->complete(eh);
+// TODO aconway 2006-12-20: DEBUG remove
+struct epoll {
+ epoll(uint32_t e) : events(e) { }
+ uint32_t events;
+};
+
+#define BIT(X) out << ((e.events & X) ? __STRING(X) "." : "")
+ostream& operator << (ostream& out, epoll e) {
+ out << "epoll_event.events: ";
+ BIT(EPOLLIN);
+ BIT(EPOLLPRI);
+ BIT(EPOLLOUT);
+ BIT(EPOLLRDNORM);
+ BIT(EPOLLRDBAND);
+ BIT(EPOLLWRNORM);
+ BIT(EPOLLWRBAND);
+ BIT(EPOLLMSG);
+ BIT(EPOLLERR);
+ BIT(EPOLLHUP);
+ BIT(EPOLLONESHOT);
+ BIT(EPOLLET);
+ return out;
}
+
+
+/**
+ * Wait for epoll to wake up, return the descriptor or 0 on timeout.
+ */
+Event* EventChannel::Impl::wait(Duration timeoutNs)
+{
+ {
+ Monitor::ScopedLock l(monitor);
+ if (isShutdown)
+ throw ShutdownException();
+ }
+
+ // Increase nWaiters for the duration, notify the monitor if I'm
+ // the last one out.
+ //
+ AtomicCount::ScopedIncrement si(
+ nWaiters, boost::bind(&Monitor::notifyAll, &monitor));
+
+ // No lock, all thread safe calls or local variables:
+ //
+ const long timeoutMs =
+ (timeoutNs == TIME_INFINITE) ? -1 : timeoutNs/TIME_MSEC;
+ CleanStruct<epoll_event> ee;
+ Event* event = 0;
+
+ // Loop till we get a completed event. Some events may repost
+ // themselves and return 0, e.g. incomplete read or write events.
+ //TODO aconway 2006-12-20: FIX THIS!
+ while (!event) {
+ int n = ::epoll_wait(epollFd, &ee, 1, timeoutMs); // Thread safe.
+ if (n == 0) // Timeout
+ return 0;
+ if (n < 0 && errno == EINTR) // Interrupt, ignore it.
+ continue;
+ if (n < 0)
+ throw QPID_POSIX_ERROR(errno);
+ assert(n == 1);
+ Descriptor* ed =
+ reinterpret_cast<Descriptor*>(ee.data.ptr);
+ if (ed == 0) // We're being shut-down.
+ throw ShutdownException();
+ assert(ed != 0);
+ event = ed->wake(ee.events);
+ }
+ return event;
+}
+
+//EventChannel::Descriptor& EventChannel::Impl::getDescriptor(int fd) {
+// Mutex::ScopedLock l(monitor);
+// Descriptor& ed = descriptors[fd];
+// ed.activate(epollFd, fd);
+// return ed;
+//}
+
+
// ================================================================
// EventChannel
@@ -169,157 +463,134 @@
return shared_ptr(new EventChannel());
}
-EventChannel::EventChannel() : handler(new EventHandler()) {}
+EventChannel::EventChannel() : impl(new EventChannel::Impl()) {}
EventChannel::~EventChannel() {}
-void EventChannel::postEvent(Event& e)
+void EventChannel::post(Event& e)
{
- e.prepare(*handler);
+ e.prepare(*impl);
}
-Event* EventChannel::getEvent()
+Event* EventChannel::wait(Duration timeoutNs)
{
- static const int infiniteTimeout = -1;
- ZeroStruct<struct epoll_event> epollEvent;
+ return impl->wait(timeoutNs);
+}
- // Loop until we can complete the event. Some events may re-post
- // themselves and return 0 from complete, e.g. partial reads. //
- Event* event = 0;
- while (event == 0) {
- int eventCount = epoll_wait(handler->getEpollFd(),
- &epollEvent, 1, infiniteTimeout);
- if (eventCount < 0) {
- if (errno != EINTR) {
- QPID_LOG(warn, "Ignoring error: "
- << PosixError::getMessage(errno));
- assert(0);
- }
- }
- else if (eventCount == 1) {
- event = reinterpret_cast<Event*>(epollEvent.data.ptr);
- assert(event != 0);
- try {
- event = event->complete(*handler);
- }
- catch (const Exception& e) {
- if (event)
- event->setError(e);
- }
- catch (const std::exception& e) {
- if (event)
- event->setError(e);
- }
- }
- }
- return event;
+void EventChannel::shutdown() {
+ impl->shutdown();
}
+
+// ================================================================
+// Event and subclasses.
+
Event::~Event() {}
-void Event::prepare(EventHandler& handler)
-{
- handler.mqPut(this);
+Exception::shared_ptr_const Event::getException() const {
+ return exception;
}
-bool Event::hasError() const {
- return error;
+void Event::throwIfException() {
+ if (getException())
+ exception->throwSelf();
}
-void Event::throwIfError() throw (Exception) {
- if (hasError())
- error.throwSelf();
-}
-
-Event* Event::complete(EventHandler&)
+void Event::dispatch()
{
- return this;
+ if (!callback.empty())
+ callback();
}
-void Event::dispatch()
-{
+void Event::setException(const std::exception& e) {
+ const Exception* ex = dynamic_cast<const Exception*>(&e);
+ if (ex)
+ exception.reset(ex->clone().release());
+ else
+ exception.reset(new Exception(e));
+#ifndef NDEBUG
+ // Throw and re-catch the exception. Has no effect on the
+ // program but it triggers debuggers watching for throw. The
+ // context that sets the exception is more informative for
+ // debugging purposes than the one that ultimately throws it.
+ //
try {
- if (!callback.empty())
- callback();
- } catch (const std::exception&) {
- throw;
- } catch (...) {
- throw QPID_ERROR(INTERNAL_ERROR, "Unknown exception.");
+ throwIfException();
}
+ catch (...) { } // Ignored.
+#endif
}
-void Event::setError(const ExceptionHolder& e) {
- error = e;
+int FDEvent::getFDescriptor() const {
+ return descriptor.getFD();
}
-void ReadEvent::prepare(EventHandler& handler)
-{
- handler.epollAdd(descriptor, EPOLLIN | EPOLLONESHOT, this);
+// TODO: AMS 21/12/06 Don't like the inline new, probably cause a memory leak
+ReadEvent::ReadEvent(int fd, void* buf, size_t sz,Callback cb, bool noWait) :
+ IOEvent(cb, *(new EventChannel::Descriptor(fd)), sz, noWait), buffer(buf), bytesRead(0) {
}
-ssize_t ReadEvent::doRead() {
- ssize_t n = ::read(descriptor, static_cast<char*>(buffer) + received,
- size - received);
- if (n > 0) received += n;
- return n;
+void ReadEvent::prepare(EventChannel::Impl& impl) {
+ EventChannel::Descriptor& d = getDescriptor();
+ impl.activate(d);
+ d.getQueue(IN).push(this);
}
-Event* ReadEvent::complete(EventHandler& handler)
+void ReadEvent::complete(EventChannel::Descriptor& ed)
{
- // Read as much as possible without blocking.
- ssize_t n = doRead();
- while (n > 0 && received < size) doRead();
-
- if (received == size) {
- handler.epollDel(descriptor);
- received = 0; // Reset for re-use.
- return this;
- }
- else if (n <0 && (errno == EAGAIN)) {
- // Keep polling for more.
- handler.epollMod(descriptor, EPOLLIN | EPOLLONESHOT, this);
- return 0;
- }
- else {
- // Unexpected EOF or error. Throw ENODATA for EOF.
- handler.epollDel(descriptor);
- received = 0; // Reset for re-use.
- throw QPID_POSIX_ERROR((n < 0) ? errno : ENODATA);
+ ssize_t n = ::read(getFDescriptor(),
+ static_cast<char*>(buffer) + bytesRead,
+ size - bytesRead);
+ if (n > 0)
+ bytesRead += n;
+ if (n == 0 || (n < 0 && errno != EAGAIN)) {
+ // Use ENODATA for file closed.
+ setException(QPID_POSIX_ERROR(n == 0 ? ENODATA : errno));
+ ed.shutdownUnsafe();
}
}
-void WriteEvent::prepare(EventHandler& handler)
-{
- handler.epollAdd(descriptor, EPOLLOUT | EPOLLONESHOT, this);
+WriteEvent::WriteEvent(int fd, const void* buf, size_t sz, Callback cb) :
+ IOEvent(cb, *(new EventChannel::Descriptor(fd)), sz, noWait), buffer(buf), bytesWritten(0) {
+}
+
+void WriteEvent::prepare(EventChannel::Impl& impl) {
+ EventChannel::Descriptor& d = getDescriptor();
+ impl.activate(d);
+ d.getQueue(OUT).push(this);
}
-Event* WriteEvent::complete(EventHandler& handler)
+
+void WriteEvent::complete(EventChannel::Descriptor& ed)
{
- ssize_t n = write(descriptor, static_cast<const char*>(buffer) + written,
- size - written);
- if (n < 0) throw QPID_POSIX_ERROR(errno);
- written += n;
- if(written < size) {
- // Keep polling.
- handler.epollMod(descriptor, EPOLLOUT | EPOLLONESHOT, this);
- return 0;
+ ssize_t n = ::write(getFDescriptor(),
+ static_cast<const char*>(buffer) + bytesWritten,
+ size - bytesWritten);
+ if (n > 0)
+ bytesWritten += n;
+ if(n < 0 && errno != EAGAIN) {
+ setException(QPID_POSIX_ERROR(errno));
+ ed.shutdownUnsafe(); // Called with lock held.
}
- written = 0; // Reset for re-use.
- handler.epollDel(descriptor);
- return this;
}
-void AcceptEvent::prepare(EventHandler& handler)
-{
- handler.epollAdd(descriptor, EPOLLIN | EPOLLONESHOT, this);
+AcceptEvent::AcceptEvent(int fd, Callback cb) :
+ FDEvent(cb, *(new EventChannel::Descriptor(fd))), accepted(0) {
+}
+
+void AcceptEvent::prepare(EventChannel::Impl& impl) {
+ EventChannel::Descriptor& d = getDescriptor();
+ impl.activate(d);
+ d.getQueue(IN).push(this);
}
-Event* AcceptEvent::complete(EventHandler& handler)
+void AcceptEvent::complete(EventChannel::Descriptor& ed)
{
- handler.epollDel(descriptor);
- accepted = ::accept(descriptor, 0, 0);
- if (accepted < 0) throw QPID_POSIX_ERROR(errno);
- return this;
+ accepted = ::accept(getFDescriptor(), 0, 0);
+ if (accepted < 0) {
+ setException(QPID_POSIX_ERROR(errno));
+ ed.shutdownUnsafe(); // Called with lock held.
+ }
}
}}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannel.h?view=diff&rev=548337&r1=548336&r2=548337
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannel.h Mon Jun 18 05:11:32 2007
@@ -20,7 +20,10 @@
*/
#include "qpid/SharedObject.h"
-#include "qpid/ExceptionHolder.h"
+#include "qpid/Exception.h"
+#include "qpid/sys/Monitor.h"
+#include "qpid/sys/Time.h"
+
#include <boost/function.hpp>
#include <memory>
@@ -28,11 +31,57 @@
namespace sys {
class Event;
-class EventHandler;
-class EventChannel;
+
+/**
+ * Channel to post and wait for events.
+ */
+class EventChannel : public qpid::SharedObject<EventChannel>
+{
+ public:
+ static shared_ptr create();
+
+ /** Exception throw from wait() if channel is shut down. */
+ class ShutdownException : public qpid::Exception {};
+
+ ~EventChannel();
+
+ /** Post an event to the channel. */
+ void post(Event& event);
+
+ /**
+ * Wait for the next complete event, up to timeout.
+ *@return Pointer to event or 0 if timeout elapses.
+ *@exception ShutdownException if the channel is shut down.
+ */
+ Event* wait(Duration timeout = TIME_INFINITE);
+
+ /**
+ * Shut down the event channel.
+ * Blocks till all threads have exited wait()
+ */
+ void shutdown();
+
+
+ // Internal classes.
+ class Impl;
+ class Queue;
+ class Descriptor;
+
+ private:
+
+ EventChannel();
+
+ Mutex lock;
+ boost::shared_ptr<Impl> impl;
+};
/**
* Base class for all Events.
+ *
+ * Derived classes define events representing various async IO operations.
+ * When an event is complete, it is returned by the EventChannel to
+ * a thread calling wait. The thread will call Event::dispatch() to
+ * execute code associated with event completion.
*/
class Event
{
@@ -40,132 +89,121 @@
/** Type for callback when event is dispatched */
typedef boost::function0<void> Callback;
- /**
- * Create an event with optional callback.
- * Instances of Event are sent directly through the channel.
- * Derived classes define additional waiting behaviour.
- *@param cb A callback functor that is invoked when dispatch() is called.
- */
- Event(Callback cb = 0) : callback(cb) {}
-
virtual ~Event();
/** Call the callback provided to the constructor, if any. */
void dispatch();
- /** True if there was an error processing this event */
- bool hasError() const;
+ /**
+ *If there was an exception processing this Event, return it.
+ *@return 0 if there was no exception.
+ */
+ qpid::Exception::shared_ptr_const getException() const;
+
+ /** If getException() throw the corresponding exception. */
+ void throwIfException();
- /** If hasError() throw the corresponding exception. */
- void throwIfError() throw(Exception);
+ /** Set the dispatch callback. */
+ void setCallback(Callback cb) { callback = cb; }
+
+ /** Set the exception. */
+ void setException(const std::exception& e);
protected:
- virtual void prepare(EventHandler&);
- virtual Event* complete(EventHandler&);
- void setError(const ExceptionHolder& e);
+ Event(Callback cb=0) : callback(cb) {}
+
+ virtual void prepare(EventChannel::Impl&) = 0;
+ virtual void complete(EventChannel::Descriptor&) = 0;
Callback callback;
- ExceptionHolder error;
+ Exception::shared_ptr_const exception;
friend class EventChannel;
- friend class EventHandler;
+ friend class EventChannel::Queue;
};
-template <class BufT>
-class IOEvent : public Event {
+/** Base class for events related to a file descriptor */
+class FDEvent : public Event {
+ public:
+ EventChannel::Descriptor& getDescriptor() const { return descriptor; }
+ int getFDescriptor() const;
+
+ protected:
+ FDEvent(Callback cb, EventChannel::Descriptor& fd)
+ : Event(cb), descriptor(fd) {}
+ // TODO AMS: 1/6/07 I really don't think this is correct, but
+ // the descriptor is immutable
+ FDEvent& operator=(const FDEvent& rhs) { Event::operator=(rhs); return *this; }
+
+ private:
+ EventChannel::Descriptor& descriptor;
+};
+
+/** Base class for read or write events. */
+class IOEvent : public FDEvent {
public:
- void getDescriptor() const { return descriptor; }
size_t getSize() const { return size; }
- BufT getBuffer() const { return buffer; }
-
+
protected:
- IOEvent(int fd, Callback cb, size_t sz, BufT buf) :
- Event(cb), descriptor(fd), buffer(buf), size(sz) {}
+ IOEvent(Callback cb, EventChannel::Descriptor& fd, size_t sz, bool noWait_) :
+ FDEvent(cb, fd), size(sz), noWait(noWait_) {}
- int descriptor;
- BufT buffer;
size_t size;
+ bool noWait;
};
/** Asynchronous read event */
-class ReadEvent : public IOEvent<void*>
+class ReadEvent : public IOEvent
{
public:
- explicit ReadEvent(int fd=-1, void* buf=0, size_t sz=0, Callback cb=0) :
- IOEvent<void*>(fd, cb, sz, buf), received(0) {}
+ explicit ReadEvent(int fd, void* buf=0, size_t sz=0,Callback cb=0, bool noWait=false);
+ void* getBuffer() const { return buffer; }
+ size_t getBytesRead() const { return bytesRead; }
+
private:
- void prepare(EventHandler&);
- Event* complete(EventHandler&);
+ void prepare(EventChannel::Impl&);
+ void complete(EventChannel::Descriptor&);
ssize_t doRead();
- size_t received;
+ void* buffer;
+ size_t bytesRead;
};
/** Asynchronous write event */
-class WriteEvent : public IOEvent<const void*>
+class WriteEvent : public IOEvent
{
public:
- explicit WriteEvent(int fd=-1, const void* buf=0, size_t sz=0,
- Callback cb=0) :
- IOEvent<const void*>(fd, cb, sz, buf), written(0) {}
+ explicit WriteEvent(int fd, const void* buf=0, size_t sz=0, Callback cb=0);
- protected:
- void prepare(EventHandler&);
- Event* complete(EventHandler&);
+ const void* getBuffer() const { return buffer; }
+ size_t getBytesWritten() const { return bytesWritten; }
private:
+ void prepare(EventChannel::Impl&);
+ void complete(EventChannel::Descriptor&);
ssize_t doWrite();
- size_t written;
+
+ const void* buffer;
+ size_t bytesWritten;
};
+
/** Asynchronous socket accept event */
-class AcceptEvent : public Event
+class AcceptEvent : public FDEvent
{
public:
/** Accept a connection on fd. */
- explicit AcceptEvent(int fd=-1, Callback cb=0) :
- Event(cb), descriptor(fd), accepted(0) {}
-
- /** Get descriptor for server socket */
+ explicit AcceptEvent(int fd, Callback cb=0);
+
+ /** Get descriptor for accepted server socket */
int getAcceptedDesscriptor() const { return accepted; }
private:
- void prepare(EventHandler&);
- Event* complete(EventHandler&);
+ void prepare(EventChannel::Impl&);
+ void complete(EventChannel::Descriptor&);
- int descriptor;
int accepted;
-};
-
-
-class QueueSet;
-
-/**
- * Channel to post and wait for events.
- */
-class EventChannel : public qpid::SharedObject<EventChannel>
-{
- public:
- static shared_ptr create();
-
- ~EventChannel();
-
- /** Post an event to the channel. */
- void postEvent(Event& event);
-
- /** Post an event to the channel. Must not be 0. */
- void postEvent(Event* event) { postEvent(*event); }
-
- /**
- * Wait for the next complete event.
- *@return Pointer to event. Will never return 0.
- */
- Event* getEvent();
-
- private:
- EventChannel();
- boost::shared_ptr<EventHandler> handler;
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelAcceptor.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelAcceptor.cpp?view=diff&rev=548337&r1=548336&r2=548337
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelAcceptor.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelAcceptor.cpp Mon Jun 18 05:11:32 2007
@@ -106,7 +106,7 @@
if (!isRunning && !isShutdown) {
isRunning = true;
factory = f;
- threads->postEvent(acceptEvent);
+ threads->post(acceptEvent);
}
}
threads->join(); // Wait for shutdown.
@@ -120,7 +120,7 @@
isShutdown = true;
}
if (doShutdown) {
- ::close(acceptEvent.getDescriptor());
+ ::close(acceptEvent.getFDescriptor());
threads->shutdown();
for_each(connections.begin(), connections.end(),
boost::bind(&EventChannelConnection::close, _1));
@@ -139,11 +139,11 @@
shutdown();
return;
}
- // TODO aconway 2006-11-29: Need to reap closed connections also.
int fd = acceptEvent.getAcceptedDesscriptor();
+ threads->post(acceptEvent); // Keep accepting.
+ // TODO aconway 2006-11-29: Need to reap closed connections also.
connections.push_back(
new EventChannelConnection(threads, *factory, fd, fd, isTrace));
- threads->postEvent(acceptEvent); // Keep accepting.
}
}} // namespace qpid::sys
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelConnection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelConnection.cpp?view=diff&rev=548337&r1=548336&r2=548337
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelConnection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelConnection.cpp Mon Jun 18 05:11:32 2007
@@ -24,7 +24,6 @@
#include "EventChannelConnection.h"
#include "qpid/sys/ConnectionInputHandlerFactory.h"
#include "qpid/QpidError.h"
-#include "qpid/log/Statement.h"
using namespace std;
using namespace qpid;
@@ -44,6 +43,8 @@
) :
readFd(rfd),
writeFd(wfd ? wfd : rfd),
+ readEvent(readFd),
+ writeEvent(writeFd),
readCallback(boost::bind(&EventChannelConnection::closeOnException,
this, &EventChannelConnection::endInitRead)),
@@ -55,8 +56,8 @@
out(bufferSize),
isTrace(isTrace_)
{
- BOOST_ASSERT(readFd > 0);
- BOOST_ASSERT(writeFd > 0);
+ assert(readFd > 0);
+ assert(writeFd > 0);
closeOnException(&EventChannelConnection::startRead);
}
@@ -133,14 +134,17 @@
}
// No need to lock here - only one thread can be writing at a time.
out.clear();
- QPID_LOG(trace, "Send on socket " << writeFd << ": " << *frame);
+ if (isTrace)
+ cout << "Send on socket " << writeFd << ": " << *frame << endl;
frame->encode(out);
out.flip();
+ // TODO: AMS 1/6/07 This only works because we already have the correct fd
+ // in the descriptor - change not to use assigment
writeEvent = WriteEvent(
writeFd, out.start(), out.available(),
boost::bind(&EventChannelConnection::closeOnException,
this, &EventChannelConnection::endWrite));
- threads->postEvent(writeEvent);
+ threads->post(writeEvent);
}
// ScopedBusy ctor increments busyThreads.
@@ -161,12 +165,18 @@
ScopedBusy(*this);
{
Monitor::ScopedLock lock(monitor);
+ assert(isWriting);
isWriting = false;
- if (isClosed)
+ if (isClosed)
return;
writeEvent.throwIfException();
+ if (writeEvent.getBytesWritten() < writeEvent.getSize()) {
+ // Keep writing the current event till done.
+ isWriting = true;
+ threads->post(writeEvent);
+ }
}
- // Check if there's more in to write in the write queue.
+ // Continue writing from writeFrames queue.
startWrite();
}
@@ -179,8 +189,8 @@
void EventChannelConnection::startRead() {
// Non blocking read, as much as we can swallow.
readEvent = ReadEvent(
- readFd, in.start(), in.available(), readCallback,true);
- threads->postEvent(readEvent);
+ readFd, in.start(), in.available(), readCallback);
+ threads->post(readEvent);
}
// Completion of initial read, expect protocolInit.
@@ -194,7 +204,7 @@
in.flip();
ProtocolInitiation protocolInit;
if(protocolInit.decode(in)){
- handler->initiated(&protocolInit);
+ handler->initiated(protocolInit);
readCallback = boost::bind(
&EventChannelConnection::closeOnException,
this, &EventChannelConnection::endRead);
@@ -215,8 +225,10 @@
in.flip();
AMQFrame frame;
while (frame.decode(in)) {
- QPID_LOG(trace, "Received on socket " << readFd
- << ": " << frame);
+ // TODO aconway 2006-11-30: received should take Frame&
+ if (isTrace)
+ cout << "Received on socket " << readFd
+ << ": " << frame << endl;
handler->received(&frame);
}
in.compact();
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelConnection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelConnection.h?view=diff&rev=548337&r1=548336&r2=548337
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelConnection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelConnection.h Mon Jun 18 05:11:32 2007
@@ -34,7 +34,7 @@
class ConnectionInputHandlerFactory;
/**
- * Implements ConnectionOutputHandler and delegates to a ConnectionInputHandler
+ * Implements SessionContext and delegates to a SessionHandler
* for a connection via the EventChannel.
*@param readDescriptor file descriptor for reading.
*@param writeDescriptor file descriptor for writing,
@@ -50,7 +50,7 @@
bool isTrace = false
);
- // TODO aconway 2006-11-30: ConnectionOutputHandler::send should take auto_ptr
+ // TODO aconway 2006-11-30: SessionContext::send should take auto_ptr
virtual void send(qpid::framing::AMQFrame* frame) {
send(std::auto_ptr<qpid::framing::AMQFrame>(frame));
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelThreads.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelThreads.cpp?view=diff&rev=548337&r1=548336&r2=548337
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelThreads.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelThreads.cpp Mon Jun 18 05:11:32 2007
@@ -16,27 +16,40 @@
*
*/
-#include "EventChannelThreads.h"
-#include "qpid/sys/Runnable.h"
-#include "qpid/log/Statement.h"
#include <iostream>
-using namespace std;
+#include <limits>
+
#include <boost/bind.hpp>
+#include "qpid/sys/Runnable.h"
+
+#include "EventChannelThreads.h"
+
namespace qpid {
namespace sys {
+const size_t EventChannelThreads::unlimited =
+ std::numeric_limits<size_t>::max();
+
EventChannelThreads::shared_ptr EventChannelThreads::create(
- EventChannel::shared_ptr ec)
+ EventChannel::shared_ptr ec, size_t min, size_t max
+)
{
- return EventChannelThreads::shared_ptr(new EventChannelThreads(ec));
+ return EventChannelThreads::shared_ptr(
+ new EventChannelThreads(ec, min, max));
}
-EventChannelThreads::EventChannelThreads(EventChannel::shared_ptr ec) :
- channel(ec), nWaiting(0), state(RUNNING)
+EventChannelThreads::EventChannelThreads(
+ EventChannel::shared_ptr ec, size_t min, size_t max) :
+ minThreads(std::max(size_t(1), min)),
+ maxThreads(std::min(min, max)),
+ channel(ec),
+ nWaiting(0),
+ state(RUNNING)
{
- // TODO aconway 2006-11-15: Estimate initial threads based on CPUs.
- addThread();
+ Monitor::ScopedLock l(monitor);
+ while (workers.size() < minThreads)
+ workers.push_back(Thread(*this));
}
EventChannelThreads::~EventChannelThreads() {
@@ -46,32 +59,30 @@
void EventChannelThreads::shutdown()
{
- ScopedLock lock(*this);
+ Monitor::ScopedLock lock(monitor);
if (state != RUNNING) // Already shutting down.
return;
- for (size_t i = 0; i < workers.size(); ++i) {
- channel->postEvent(terminate);
- }
- state = TERMINATE_SENT;
- notify(); // Wake up one join() thread.
+ state = TERMINATING;
+ channel->shutdown();
+ monitor.notify(); // Wake up one join() thread.
}
void EventChannelThreads::join()
{
{
- ScopedLock lock(*this);
+ Monitor::ScopedLock lock(monitor);
while (state == RUNNING) // Wait for shutdown to start.
- wait();
+ monitor.wait();
if (state == SHUTDOWN) // Shutdown is complete
return;
if (state == JOINING) {
// Someone else is doing the join.
while (state != SHUTDOWN)
- wait();
+ monitor.wait();
return;
}
// I'm the joining thread
- assert(state == TERMINATE_SENT);
+ assert(state == TERMINATING);
state = JOINING;
} // Drop the lock.
@@ -80,12 +91,13 @@
workers[i].join();
}
state = SHUTDOWN;
- notifyAll(); // Notify other join() threaeds.
+ monitor.notifyAll(); // Notify any other join() threads.
}
void EventChannelThreads::addThread() {
- ScopedLock l(*this);
- workers.push_back(Thread(*this));
+ Monitor::ScopedLock l(monitor);
+ if (workers.size() < maxThreads)
+ workers.push_back(Thread(*this));
}
void EventChannelThreads::run()
@@ -94,23 +106,20 @@
AtomicCount::ScopedIncrement inc(nWaiting);
try {
while (true) {
- Event* e = channel->getEvent();
+ Event* e = channel->wait();
assert(e != 0);
- if (e == &terminate) {
- return;
- }
AtomicCount::ScopedDecrement dec(nWaiting);
- // I'm no longer waiting, make sure someone is.
- if (dec == 0)
+ // Make sure there's at least one waiting thread.
+ if (dec == 0 && state == RUNNING)
addThread();
e->dispatch();
}
}
- catch (const std::exception& e) {
- QPID_LOG(error, e.what());
+ catch (const EventChannel::ShutdownException& e) {
+ return;
}
- catch (...) {
- QPID_LOG(error, "unknown exception");
+ catch (const std::exception& e) {
+ Exception::log(e, "Exception in EventChannelThreads::run()");
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelThreads.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelThreads.h?view=diff&rev=548337&r1=548336&r2=548337
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelThreads.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelThreads.h Mon Jun 18 05:11:32 2007
@@ -18,14 +18,16 @@
* limitations under the License.
*
*/
-#include <vector>
+#include "EventChannel.h"
#include "qpid/Exception.h"
-#include "qpid/sys/Time.h"
+#include "qpid/sys/AtomicCount.h"
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Thread.h"
-#include "qpid/sys/AtomicCount.h"
-#include "EventChannel.h"
+#include "qpid/sys/Time.h"
+#include "qpid/sys/Runnable.h"
+
+#include <vector>
namespace qpid {
namespace sys {
@@ -33,26 +35,33 @@
/**
Dynamic thread pool serving an EventChannel.
- Threads run a loop { e = getEvent(); e->dispatch(); }
+ Threads run a loop { e = wait(); e->dispatch(); }
The size of the thread pool is automatically adjusted to optimal size.
*/
class EventChannelThreads :
public qpid::SharedObject<EventChannelThreads>,
- public sys::Monitor, private sys::Runnable
+ private sys::Runnable
{
public:
- /** Create the thread pool and start initial threads. */
+ /** Constant to represent an unlimited number of threads */
+ static const size_t unlimited;
+
+ /**
+ * Create the thread pool and start initial threads.
+ * @param minThreads Pool will initialy contain minThreads threads and
+ * will never shrink to less until shutdown.
+ * @param maxThreads Pool will never grow to more than maxThreads.
+ */
static EventChannelThreads::shared_ptr create(
- EventChannel::shared_ptr channel
+ EventChannel::shared_ptr channel = EventChannel::create(),
+ size_t minThreads = 1,
+ size_t maxThreads = unlimited
);
~EventChannelThreads();
/** Post event to the underlying channel */
- void postEvent(Event& event) { channel->postEvent(event); }
-
- /** Post event to the underlying channel Must not be 0. */
- void postEvent(Event* event) { channel->postEvent(event); }
+ void post(Event& event) { channel->post(event); }
/**
* Terminate all threads.
@@ -68,21 +77,25 @@
private:
typedef std::vector<sys::Thread> Threads;
typedef enum {
- RUNNING, TERMINATE_SENT, JOINING, SHUTDOWN
+ RUNNING, TERMINATING, JOINING, SHUTDOWN
} State;
- EventChannelThreads(EventChannel::shared_ptr underlyingChannel);
+ EventChannelThreads(
+ EventChannel::shared_ptr channel, size_t min, size_t max);
+
void addThread();
void run();
bool keepRunning();
void adjustThreads();
+ Monitor monitor;
+ size_t minThreads;
+ size_t maxThreads;
EventChannel::shared_ptr channel;
Threads workers;
sys::AtomicCount nWaiting;
State state;
- Event terminate;
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/check.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/check.h?view=diff&rev=548337&r1=548336&r2=548337
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/check.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/check.h Mon Jun 18 05:11:32 2007
@@ -43,7 +43,7 @@
int getErrNo() { return errNo; }
- Exception* clone() const throw() { return new PosixError(*this); }
+ Exception::auto_ptr clone() const throw() { return Exception::auto_ptr(new PosixError(*this)); }
void throwSelf() const { throw *this; }
@@ -55,6 +55,10 @@
/** Create a PosixError for the current file/line and errno. */
#define QPID_POSIX_ERROR(errNo) ::qpid::sys::PosixError(errNo, SRCLINE)
+
+/** Throw QPID_POSIX_ERROR(errno) if RESULT is less than zero */
+#define QPID_POSIX_CHECK(RESULT) \
+ if ((RESULT) < 0) throw QPID_POSIX_ERROR((errno))
/** Throw a posix error if errNo is non-zero */
#define QPID_POSIX_THROW_IF(ERRNO) \
Added: incubator/qpid/trunk/qpid/cpp/src/tests/DispatcherTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/DispatcherTest.cpp?view=auto&rev=548337
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/DispatcherTest.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/DispatcherTest.cpp Mon Jun 18 05:11:32 2007
@@ -0,0 +1,128 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Poller.h"
+#include "qpid/sys/Dispatcher.h"
+#include "qpid/sys/Thread.h"
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <errno.h>
+
+#include <iostream>
+#include <boost/bind.hpp>
+
+using namespace std;
+using namespace qpid::sys;
+
+int writeALot(int fd, const string& s) {
+ int bytesWritten = 0;
+ do {
+ errno = 0;
+ int lastWrite = ::write(fd, s.c_str(), s.size());
+ if ( lastWrite >= 0) {
+ bytesWritten += lastWrite;
+ }
+ } while (errno != EAGAIN);
+ return bytesWritten;
+}
+
+int readALot(int fd) {
+ int bytesRead = 0;
+ char buf[10240];
+
+ do {
+ errno = 0;
+ int lastRead = ::read(fd, buf, sizeof(buf));
+ if ( lastRead >= 0) {
+ bytesRead += lastRead;
+ }
+ } while (errno != EAGAIN);
+ return bytesRead;
+}
+
+int64_t writtenBytes = 0;
+int64_t readBytes = 0;
+
+void writer(DispatchHandle& h, int fd, const string& s) {
+ writtenBytes += writeALot(fd, s);
+ h.rewatch();
+}
+
+void reader(DispatchHandle& h, int fd) {
+ readBytes += readALot(fd);
+ h.rewatch();
+}
+
+int main(int argc, char** argv)
+{
+ // Create poller
+ Poller::shared_ptr poller(new Poller);
+
+ // Create dispatcher thread
+ Dispatcher d(poller);
+ Dispatcher d1(poller);
+ //Dispatcher d2(poller);
+ //Dispatcher d3(poller);
+ Thread dt(d);
+ Thread dt1(d1);
+ //Thread dt2(d2);
+ //Thread dt3(d3);
+
+ // Setup sender and receiver
+ int sv[2];
+ int rc = ::socketpair(AF_LOCAL, SOCK_STREAM, 0, sv);
+ assert(rc >= 0);
+
+ // Set non-blocking
+ rc = ::fcntl(sv[0], F_SETFL, O_NONBLOCK);
+ assert(rc >= 0);
+
+ rc = ::fcntl(sv[1], F_SETFL, O_NONBLOCK);
+ assert(rc >= 0);
+
+ // Make up a large string
+ string testString = "This is only a test ... 1,2,3,4,5,6,7,8,9,10;";
+ for (int i = 0; i < 8; i++)
+ testString += testString;
+
+ DispatchHandle rh(sv[0], boost::bind(reader, _1, sv[0]), 0);
+ DispatchHandle wh(sv[1], 0, boost::bind(writer, _1, sv[1], testString));
+
+ rh.watch(poller);
+ wh.watch(poller);
+
+ // wait 2 minutes then shutdown
+ sleep(60);
+
+ poller->shutdown();
+ dt.join();
+ dt1.join();
+ //dt2.join();
+ //dt3.join();
+
+ cout << "Wrote: " << writtenBytes << "\n";
+ cout << "Read: " << readBytes << "\n";
+
+ return 0;
+}
Added: incubator/qpid/trunk/qpid/cpp/src/tests/PollerTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/PollerTest.cpp?view=auto&rev=548337
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/PollerTest.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/PollerTest.cpp Mon Jun 18 05:11:32 2007
@@ -0,0 +1,164 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * Use socketpair to test the poller
+ */
+
+#include "qpid/sys/Poller.h"
+
+#include <string>
+#include <iostream>
+#include <memory>
+#include <exception>
+
+#include <assert.h>
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <errno.h>
+
+using namespace std;
+using namespace qpid::sys;
+
+int writeALot(int fd, const string& s) {
+ int bytesWritten = 0;
+ do {
+ errno = 0;
+ int lastWrite = ::write(fd, s.c_str(), s.size());
+ if ( lastWrite >= 0) {
+ bytesWritten += lastWrite;
+ }
+ } while (errno != EAGAIN);
+ return bytesWritten;
+}
+
+int readALot(int fd) {
+ int bytesRead = 0;
+ char buf[1024];
+
+ do {
+ errno = 0;
+ int lastRead = ::read(fd, buf, sizeof(buf));
+ if ( lastRead >= 0) {
+ bytesRead += lastRead;
+ }
+ } while (errno != EAGAIN);
+ return bytesRead;
+}
+
+int main(int argc, char** argv)
+{
+ try
+ {
+ int sv[2];
+ int rc = ::socketpair(AF_LOCAL, SOCK_STREAM, 0, sv);
+ assert(rc >= 0);
+
+ // Set non-blocking
+ rc = ::fcntl(sv[0], F_SETFL, O_NONBLOCK);
+ assert(rc >= 0);
+
+ rc = ::fcntl(sv[1], F_SETFL, O_NONBLOCK);
+ assert(rc >= 0);
+
+ // Make up a large string
+ string testString = "This is only a test ... 1,2,3,4,5,6,7,8,9,10;";
+ for (int i = 0; i < 6; i++)
+ testString += testString;
+
+ // Read as much as we can from socket 0
+ int bytesRead = readALot(sv[0]);
+ assert(bytesRead == 0);
+ cout << "Read(0): " << bytesRead << " bytes\n";
+
+ // Write as much as we can to socket 0
+ int bytesWritten = writeALot(sv[0], testString);
+ cout << "Wrote(0): " << bytesWritten << " bytes\n";
+
+ // Read as much as we can from socket 1
+ bytesRead = readALot(sv[1]);
+ assert(bytesRead == bytesWritten);
+ cout << "Read(1): " << bytesRead << " bytes\n";
+
+ auto_ptr<Poller> poller(new Poller);
+
+ PollerHandle h0(sv[0]);
+ PollerHandle h1(sv[1]);
+
+ poller->addFd(h0, Poller::INOUT);
+
+ // Wait for 500ms - h0 should be writable
+ Poller::Event event = poller->wait();
+ assert(event.handle == &h0);
+ assert(event.dir == Poller::OUT);
+
+ // Write as much as we can to socket 0
+ bytesWritten = writeALot(sv[0], testString);
+ cout << "Wrote(0): " << bytesWritten << " bytes\n";
+
+ // Wait for 500ms - h0 no longer writable
+ poller->rearmFd(h0);
+ event = poller->wait(500000000);
+ assert(event.handle == 0);
+
+ // Test we can read it all now
+ poller->addFd(h1, Poller::INOUT);
+ event = poller->wait();
+ assert(event.handle == &h1);
+ assert(event.dir == Poller::INOUT);
+
+ bytesRead = readALot(sv[1]);
+ assert(bytesRead == bytesWritten);
+ cout << "Read(1): " << bytesRead << " bytes\n";
+
+ // At this point h1 should have been disabled from the poller
+ // (as it was just returned) and h0 can write again
+ event = poller->wait();
+ assert(event.handle == &h0);
+ assert(event.dir == Poller::OUT);
+
+ // Now both the handles should be disabled
+ event = poller->wait(500000000);
+ assert(event.handle == 0);
+
+ // Test shutdown
+ poller->shutdown();
+ event = poller->wait();
+ assert(event.handle == 0);
+ assert(event.dir == Poller::SHUTDOWN);
+
+ event = poller->wait();
+ assert(event.handle == 0);
+ assert(event.dir == Poller::SHUTDOWN);
+
+ poller->delFd(h1);
+ poller->delFd(h0);
+
+ return 0;
+ } catch (exception& e) {
+ cout << "Caught exception " << e.what() << "\n";
+ }
+}
+
+