You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2007/06/18 14:11:34 UTC

svn commit: r548337 - in /incubator/qpid/trunk/qpid/cpp: ./ src/ src/qpid/ src/qpid/log/ src/qpid/sys/ src/qpid/sys/epoll/ src/qpid/sys/posix/ src/tests/

Author: astitcher
Date: Mon Jun 18 05:11:32 2007
New Revision: 548337

URL: http://svn.apache.org/viewvc?view=rev&rev=548337
Log:
Intermediate checkin with preliminary work on epoll based net IO

Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/DispatcherTest.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/PollerTest.cpp
Modified:
    incubator/qpid/trunk/qpid/cpp/configure.ac
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/QpidError.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/QpidError.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/log/Statement.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ScopedIncrement.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Time.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannel.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannel.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelAcceptor.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelConnection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelConnection.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelThreads.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelThreads.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/check.h

Modified: incubator/qpid/trunk/qpid/cpp/configure.ac
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/configure.ac?view=diff&rev=548337&r1=548336&r2=548337
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/configure.ac (original)
+++ incubator/qpid/trunk/qpid/cpp/configure.ac Mon Jun 18 05:11:32 2007
@@ -121,7 +121,6 @@
   PKG_CHECK_MODULES([APR], [apr-1 >= $APR_MINIMUM_VERSION])
   APR_CXXFLAGS="$APR_CFLAGS"
   if test "$enable_APR_NETIO" = yes; then
-  	APR_CXXFLAGS+=" -DUSE_APR_NETIO=1"
 	USE_APR_NETIO=1
   fi
   if test "$enable_APR_PLATFORM" = yes; then

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?view=diff&rev=548337&r1=548336&r2=548337
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Mon Jun 18 05:11:32 2007
@@ -69,6 +69,7 @@
 
 posix_plat_src = \
   qpid/sys/posix/check.cpp \
+  qpid/sys/epoll/EpollPoller.cpp \
   qpid/sys/posix/Socket.cpp \
   qpid/sys/posix/Time.cpp \
   qpid/sys/posix/Thread.cpp
@@ -155,7 +156,6 @@
   gen/qpid/framing/AMQP_MethodVersionMap.cpp \
   gen/qpid/framing/AMQP_ServerProxy.cpp \
   qpid/Exception.cpp \
-  qpid/ExceptionHolder.cpp \
   qpid/Url.h \
   qpid/Url.cpp \
   qpid/QpidError.cpp \
@@ -345,6 +345,7 @@
   qpid/framing/amqp_types_full.h \
   qpid/sys/Acceptor.h \
   qpid/sys/AtomicCount.h \
+  qpid/sys/Dispatcher.h \
   qpid/sys/Condition.h \
   qpid/sys/ConnectionInputHandler.h \
   qpid/sys/ConnectionInputHandlerFactory.h \
@@ -352,6 +353,7 @@
   qpid/sys/Module.h \
   qpid/sys/Monitor.h \
   qpid/sys/Mutex.h \
+  qpid/sys/Poller.h \
   qpid/sys/ProducerConsumer.h \
   qpid/sys/Runnable.h \
   qpid/sys/ScopedIncrement.h \

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.cpp?view=diff&rev=548337&r1=548336&r2=548337
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.cpp Mon Jun 18 05:11:32 2007
@@ -42,18 +42,34 @@
 
 Exception::Exception(const char* str) throw() : whatStr(str) { ctorLog(this); }
 
+Exception::Exception(const std::exception& e) throw() : whatStr(e.what()) {}
+
 Exception::~Exception() throw() {}
 
 const char* Exception::what() const throw() { return whatStr.c_str(); }
 
 std::string Exception::toString() const throw() { return whatStr; }
 
-Exception* Exception::clone() const throw() { return new Exception(*this); }
+Exception::auto_ptr Exception::clone() const throw() { return Exception::auto_ptr(new Exception(*this)); }
 
 void Exception::throwSelf() const  { throw *this; }
 
 ShutdownException::ShutdownException() : Exception("Shut down.") {}
 
 EmptyException::EmptyException() : Exception("Empty.") {}
+
+const char* Exception::defaultMessage = "Unexpected exception";
+
+void Exception::log(const char* what, const char* message) {
+    QPID_LOG(error, message << ": " << what);
+}
+
+void Exception::log(const std::exception& e, const char* message) {
+    log(e.what(), message);
+}
+
+void Exception::logUnknown(const char* message) {
+    log("unknown exception.", message);
+}
 
 } // namespace qpid

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.h?view=diff&rev=548337&r1=548336&r2=548337
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.h Mon Jun 18 05:11:32 2007
@@ -21,13 +21,15 @@
  * under the License.
  *
  */
+
+#include "framing/amqp_types.h"
+
 #include <exception>
 #include <string>
 #include <memory>
 #include <boost/shared_ptr.hpp>
 #include <boost/lexical_cast.hpp>
-
-#include "framing/amqp_types.h"
+#include <boost/function.hpp>
 
 namespace qpid
 {
@@ -44,6 +46,10 @@
     std::string whatStr;
 
   public:
+    typedef boost::shared_ptr<Exception> shared_ptr;
+    typedef boost::shared_ptr<const Exception> shared_ptr_const;
+    typedef std::auto_ptr<Exception> auto_ptr;
+
     Exception() throw();
     Exception(const std::string& str) throw();
     Exception(const char* str) throw();
@@ -59,10 +65,61 @@
     virtual const char* what() const throw();
     virtual std::string toString() const throw();
 
-    virtual Exception* clone() const throw();
+    virtual auto_ptr clone() const throw();
     virtual void throwSelf() const;
 
-    typedef boost::shared_ptr<Exception> shared_ptr;
+    /** Default message: "Unknown exception" or something like it. */
+    static const char* defaultMessage;
+
+    /**
+     * Log a message of the form "message: what"
+     *@param what Exception's what() message.
+     *@param message Prefix message.
+     */
+    static void log(const char* what, const char* message = defaultMessage);
+
+    /**
+     * Log an exception.
+     *@param e Exception to log.
+
+     */
+    static void log(
+        const std::exception& e, const char* message = defaultMessage);
+    
+
+    /**
+     * Log an unknown exception - use in catch(...)
+     *@param message Prefix message.
+     */
+    static void logUnknown(const char* message = defaultMessage);
+
+    /**
+     * Wrapper template function to call another function inside
+     * try/catch and log any exception. Use boost::bind to wrap
+     * member function calls or functions with arguments.
+     * 
+     *@param f Function to call in try block.
+     *@param retrhow If true the exception is rethrown.
+     *@param message Prefix message.
+     */
+    template <class T>
+    static T tryCatchLog(boost::function0<T> f, bool rethrow=true,
+                         const char* message=defaultMessage)
+    {
+        try {
+            return f();
+        }
+        catch (const std::exception& e) {
+            log(e, message);
+            if (rethrow)
+                throw;
+        }
+        catch (...) {
+            logUnknown(message);
+            if (rethrow)
+                throw;
+        }
+    }
 };
 
 struct ChannelException : public Exception {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/QpidError.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/QpidError.cpp?view=diff&rev=548337&r1=548336&r2=548337
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/QpidError.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/QpidError.cpp Mon Jun 18 05:11:32 2007
@@ -30,7 +30,7 @@
 
 QpidError::~QpidError() throw() {}
 
-Exception* QpidError::clone() const throw() { return new QpidError(*this); }
+Exception::auto_ptr QpidError::clone() const throw() { return Exception::auto_ptr(new QpidError(*this)); }
 
 void QpidError::throwSelf() const  { throw *this; }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/QpidError.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/QpidError.h?view=diff&rev=548337&r1=548336&r2=548337
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/QpidError.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/QpidError.h Mon Jun 18 05:11:32 2007
@@ -52,7 +52,7 @@
     { init(); }
         
     ~QpidError() throw();
-    Exception* clone() const throw();
+    Exception::auto_ptr clone() const throw();
     void throwSelf() const;
 
   private:

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/log/Statement.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/log/Statement.h?view=diff&rev=548337&r1=548336&r2=548337
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/log/Statement.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/log/Statement.h Mon Jun 18 05:11:32 2007
@@ -19,7 +19,6 @@
  *
  */
 
-#include "qpid/log/Statement.h"
 #include <boost/current_function.hpp>
 #include <sstream>
 

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp?view=auto&rev=548337
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp Mon Jun 18 05:11:32 2007
@@ -0,0 +1,90 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Dispatcher.h"
+
+#include <assert.h>
+
+namespace qpid {
+namespace sys {
+
+Dispatcher::Dispatcher(Poller::shared_ptr poller0) :
+  poller(poller0) {
+}
+
+Dispatcher::~Dispatcher() {
+}
+    
+void Dispatcher::run() {
+    do {
+        Poller::Event event = poller->wait();
+        // Poller::wait guarantees to return an event
+        DispatchHandle* h = static_cast<DispatchHandle*>(event.handle);
+        switch (event.dir) {
+        case Poller::IN:
+            h->readableCallback(*h);
+            break;
+        case Poller::OUT:
+            h->writableCallback(*h);
+            break;
+        case Poller::INOUT:
+            h->readableCallback(*h);
+            h->writableCallback(*h);
+            break;
+        case Poller::SHUTDOWN:
+            goto dispatcher_shutdown;
+        default:
+            ;
+        }
+    } while (true);
+    
+dispatcher_shutdown:
+    ;
+}
+
+void DispatchHandle::watch(Poller::shared_ptr poller0) {
+    bool r = readableCallback;
+    bool w = writableCallback;
+    
+    // If no callbacks set then do nothing (that is what we were asked to do!)
+    // TODO: Maybe this should be an assert instead
+    if (!r && !w)
+        return;
+
+    Poller::Direction d = r ?
+        (w ? Poller::INOUT : Poller::IN) :
+        Poller::OUT;
+
+    poller = poller0;
+    poller->addFd(*this, d);
+}
+
+void DispatchHandle::rewatch() {
+    assert(poller);
+    poller->rearmFd(*this);
+}
+
+void DispatchHandle::unwatch() {
+    poller->delFd(*this);
+    poller.reset();
+}
+
+}}

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.h?view=auto&rev=548337
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.h Mon Jun 18 05:11:32 2007
@@ -0,0 +1,74 @@
+#ifndef _sys_Dispatcher_h
+#define _sys_Dispatcher_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+#include "Poller.h"
+#include "Runnable.h"
+
+#include <memory>
+#include <boost/function.hpp>
+
+#include <assert.h>
+
+
+namespace qpid {
+namespace sys {
+
+class Dispatcher;
+class DispatchHandle : public PollerHandle {
+    friend class Dispatcher;
+public:
+    typedef boost::function1<void, DispatchHandle&> Callback;
+
+private:
+    Callback readableCallback;
+    Callback writableCallback;
+    Poller::shared_ptr poller;
+
+public:
+    
+    DispatchHandle(int fd, Callback rCb, Callback wCb) :
+      PollerHandle(fd),
+      readableCallback(rCb),
+      writableCallback(wCb)
+    {}
+
+    void watch(Poller::shared_ptr poller);
+    void rewatch();
+    void unwatch();
+};
+
+class Dispatcher : public Runnable {
+    const Poller::shared_ptr poller;
+
+public:
+    Dispatcher(Poller::shared_ptr poller);
+    ~Dispatcher();
+    
+    void run();
+};
+
+}}
+
+#endif // _sys_Dispatcher_h

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h?view=auto&rev=548337
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h Mon Jun 18 05:11:32 2007
@@ -0,0 +1,93 @@
+#ifndef _sys_Poller_h
+#define _sys_Poller_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Time.h"
+
+#include <stdint.h>
+
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+namespace sys {
+
+/**
+ * Handle class to use for polling
+ */
+class Poller;
+class PollerHandlePrivate;
+class PollerHandle {
+    friend class Poller;
+
+    PollerHandlePrivate* impl;
+    const int fd;
+
+public:
+    PollerHandle(int fd0);
+    virtual ~PollerHandle();
+
+    int getFD() const { return fd; }
+};
+
+/**
+ * Poller: abstract class to encapsulate a file descriptor poll to be used
+ * by a reactor
+ */
+class PollerPrivate;
+class Poller {
+    PollerPrivate* impl;
+
+public:
+    typedef boost::shared_ptr<Poller> shared_ptr;
+
+    enum Direction {
+        NONE,
+        IN,
+        OUT,
+        INOUT,
+        SHUTDOWN
+    };
+
+    struct Event {
+        PollerHandle* handle;
+        Direction dir;
+        
+        Event(PollerHandle* handle0, Direction dir0) :
+          handle(handle0),
+          dir(dir0) {
+        }
+    };
+    
+    Poller();
+    ~Poller();
+    void shutdown();
+
+    void addFd(PollerHandle& handle, Direction dir);
+    void delFd(PollerHandle& handle);
+    void modFd(PollerHandle& handle, Direction dir);
+    void rearmFd(PollerHandle& handle);
+    Event wait(Duration timeout = TIME_INFINITE);
+};
+
+}}
+#endif // _sys_Poller_h

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ScopedIncrement.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ScopedIncrement.h?view=diff&rev=548337&r1=548336&r2=548337
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ScopedIncrement.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ScopedIncrement.h Mon Jun 18 05:11:32 2007
@@ -20,19 +20,27 @@
  */
 
 #include <boost/noncopyable.hpp>
+#include <boost/function.hpp>
 
 namespace qpid {
 namespace sys {
 
-/** Increment counter in constructor and decrement in destructor. */
+/**
+ * Increment counter in constructor and decrement in destructor.
+ * Optionally call a function if the decremented counter value is 0.
+ * Note the function must not throw, it is called in the destructor.
+ */
 template <class T>
 class ScopedIncrement : boost::noncopyable
 {
   public:
-    ScopedIncrement(T& c) : count(c) { ++count; }
-    ~ScopedIncrement() { --count; }
+    ScopedIncrement(T& c, boost::function0<void> f=0)
+        : count(c), callback(f) { ++count; }
+    ~ScopedIncrement() { if (--count == 0 && callback) callback(); }
+
   private:
     T& count;
+    boost::function0<void> callback;
 };
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Time.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Time.h?view=diff&rev=548337&r1=548336&r2=548337
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Time.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Time.h Mon Jun 18 05:11:32 2007
@@ -97,8 +97,12 @@
 /** Nanoseconds per nanosecond. */
 const Duration TIME_NSEC = 1;
 
+/** Value to represent an infinite timeout */
+const Duration TIME_INFINITE = std::numeric_limits<int64_t>::max();
+
 /** Time greater than any other time */
 const AbsTime FAR_FUTURE = AbsTime::FarFuture();
+
 }}
 
 #endif  /*!_sys_Time_h*/

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp?view=auto&rev=548337
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp Mon Jun 18 05:11:32 2007
@@ -0,0 +1,263 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Poller.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/posix/check.h"
+
+#include <sys/epoll.h>
+#include <errno.h>
+
+#include <assert.h>
+#include <vector>
+#include <exception>
+
+namespace qpid {
+namespace sys {
+
+class PollerHandlePrivate {
+    friend class Poller;
+    friend class PollerHandle;
+
+    enum FDStat {
+        ABSENT,
+        MONITORED,
+        INACTIVE
+    };
+
+    ::__uint32_t events;
+    FDStat stat;
+    Mutex lock;
+
+    PollerHandlePrivate() :
+      events(0),
+      stat(ABSENT) {
+    }
+};
+
+PollerHandle::PollerHandle(int fd0) :
+    impl(new PollerHandlePrivate),
+    fd(fd0)
+{}
+    
+PollerHandle::~PollerHandle() {
+    delete impl;
+}
+
+/**
+ * Concrete implementation of Poller to use the Linux specific epoll
+ * interface
+ */
+class PollerPrivate {
+    friend class Poller;
+
+    static const int DefaultFds = 256;
+
+    struct ReadablePipe {
+        int fds[2];
+        
+        /**
+         * This encapsulates an always readable pipe which we can add
+         * to the epoll set to force epoll_wait to return
+         */
+        ReadablePipe() {
+            QPID_POSIX_CHECK(::pipe(fds));
+            // Just write the pipe's fds to the pipe
+            QPID_POSIX_CHECK(::write(fds[1], fds, 2));
+        }
+        
+        ~ReadablePipe() {
+            ::close(fds[0]);
+            ::close(fds[1]);
+        }
+        
+        int getFD() {
+            return fds[0];
+        }
+    };
+    
+    static ReadablePipe alwaysReadable;
+    
+    const int epollFd;
+    bool isShutdown;
+
+    static ::__uint32_t directionToEpollEvent(Poller::Direction dir) {
+        switch (dir) {
+            case Poller::IN: return ::EPOLLIN;
+            case Poller::OUT: return ::EPOLLOUT;
+            case Poller::INOUT: return ::EPOLLIN | ::EPOLLOUT;
+            default: return 0;
+        }
+    }
+    
+    static Poller::Direction epollToDirection(::__uint32_t events) {
+        ::__uint32_t e = events & (::EPOLLIN | ::EPOLLOUT);
+        switch (e) {
+            case ::EPOLLIN: return Poller::IN;
+            case ::EPOLLOUT: return Poller::OUT;
+            case ::EPOLLIN | ::EPOLLOUT: return Poller::INOUT;
+            default: return Poller::NONE;
+        }
+    }
+
+    PollerPrivate() :
+        epollFd(::epoll_create(DefaultFds)),
+        isShutdown(false) {
+        QPID_POSIX_CHECK(epollFd);
+    }
+
+    ~PollerPrivate() {
+        // It's probably okay to ignore any errors here as there can't be data loss
+        ::close(epollFd);
+    }
+};
+
+PollerPrivate::ReadablePipe PollerPrivate::alwaysReadable;
+
+void Poller::addFd(PollerHandle& handle, Direction dir) {
+    PollerHandlePrivate& eh = *handle.impl;
+    ScopedLock<Mutex> l(eh.lock);
+    ::epoll_event epe;
+    int op;
+    
+    if (eh.stat == PollerHandlePrivate::ABSENT) {
+        op = EPOLL_CTL_ADD;
+        epe.events = PollerPrivate::directionToEpollEvent(dir) | ::EPOLLONESHOT;
+    } else {
+        assert(eh.stat == PollerHandlePrivate::MONITORED);
+        op = EPOLL_CTL_MOD;
+        epe.events = eh.events | PollerPrivate::directionToEpollEvent(dir);
+    }
+    epe.data.ptr = &handle;
+    
+    QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, op, handle.getFD(), &epe));
+    
+    // Record monitoring state of this fd
+    eh.events = epe.events;
+    eh.stat = PollerHandlePrivate::MONITORED;
+}
+
+void Poller::delFd(PollerHandle& handle) {
+    PollerHandlePrivate& eh = *handle.impl;
+    ScopedLock<Mutex> l(eh.lock);
+    assert(eh.stat != PollerHandlePrivate::ABSENT);
+    QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, handle.getFD(), 0));
+    eh.stat = PollerHandlePrivate::ABSENT;
+}
+
+// modFd is equivalent to delFd followed by addFd
+void Poller::modFd(PollerHandle& handle, Direction dir) {
+    PollerHandlePrivate& eh = *handle.impl;
+    ScopedLock<Mutex> l(eh.lock);
+    assert(eh.stat != PollerHandlePrivate::ABSENT);
+    
+    ::epoll_event epe;
+    epe.events = PollerPrivate::directionToEpollEvent(dir) | ::EPOLLONESHOT;
+    epe.data.ptr = &handle;
+    
+    QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, handle.getFD(), &epe));
+    
+    // Record monitoring state of this fd
+    eh.events = epe.events;
+    eh.stat = PollerHandlePrivate::MONITORED;
+}
+
+void Poller::rearmFd(PollerHandle& handle) {
+    PollerHandlePrivate& eh = *handle.impl;
+    ScopedLock<Mutex> l(eh.lock);
+    assert(eh.stat == PollerHandlePrivate::INACTIVE);
+
+    ::epoll_event epe;
+    epe.events = eh.events;        
+    epe.data.ptr = &handle;
+
+    QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, handle.getFD(), &epe));
+
+    eh.stat = PollerHandlePrivate::MONITORED;
+}
+
+void Poller::shutdown() {
+    // Don't use any locking here - isshutdown will be visible to all
+    // after the epoll_ctl() anyway (it's a memory barrier)
+    impl->isShutdown = true;
+    
+    // Add always readable fd to epoll (not EPOLLONESHOT)
+    int fd = impl->alwaysReadable.getFD();
+    ::epoll_event epe;
+    epe.events = ::EPOLLIN;
+    epe.data.ptr = 0;
+    QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_ADD, fd, &epe));
+}
+
+Poller::Event Poller::wait(Duration timeout) {
+    epoll_event epe;
+    int timeoutMs = (timeout == TIME_INFINITE) ? -1 : timeout / TIME_MSEC;
+
+    // Repeat until we weren't interupted
+    do {
+        int rc = ::epoll_wait(impl->epollFd, &epe, 1, timeoutMs);
+        
+        if (impl->isShutdown) {
+            return Event(0, SHUTDOWN);            
+        }
+        
+        if (rc ==-1 && errno != EINTR) {
+            QPID_POSIX_CHECK(rc);
+        } else if (rc > 0) {
+            assert(rc == 1);
+            PollerHandle* handle = static_cast<PollerHandle*>(epe.data.ptr);
+            PollerHandlePrivate& eh = *handle->impl;
+            
+            ScopedLock<Mutex> l(eh.lock);
+            
+            // the handle could have gone inactive since we left the epoll_wait
+            if (eh.stat == PollerHandlePrivate::MONITORED) {
+                eh.stat = PollerHandlePrivate::INACTIVE;
+                return Event(handle, PollerPrivate::epollToDirection(epe.events));
+            }
+        }
+        // We only get here if one of the following:
+        // * epoll_wait was interrupted by a signal
+        // * epoll_wait timed out
+        // * the state of the handle changed after being returned by epoll_wait
+        //
+        // The only things we can do here are return a timeout or wait more.
+        // Obviously if we timed out we return timeout; if the wait was meant to
+        // be indefinite then we should never return with a time out so we go again.
+        // If the wait wasn't indefinite, but we were interrupted then we have to return
+        // with a timeout as we don't know how long we've waited so far and so we can't
+        // continue the wait.
+        if (rc == 0 || timeoutMs == -1) {
+            return Event(0, NONE);
+        }
+    } while (true);
+}
+
+// Concrete constructors
+Poller::Poller() :
+    impl(new PollerPrivate())
+{}
+
+Poller::~Poller() {
+    delete impl;
+}
+
+}}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannel.cpp?view=diff&rev=548337&r1=548336&r2=548337
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannel.cpp Mon Jun 18 05:11:32 2007
@@ -1,4 +1,4 @@
-/*
+/* 
  *
  * Copyright (c) 2006 The Apache Software Foundation
  *
@@ -16,6 +16,19 @@
  *
  */
 
+// TODO aconway 2006-12-15: Locking review.
+
+// TODO aconway 2006-12-15: use Descriptor pointers everywhere,
+// get them from channel, pass them to Event constructors.
+// Eliminate lookup.
+
+
+#include "EventChannel.h"
+#include "check.h"
+
+#include "qpid/QpidError.h"
+#include "qpid/sys/AtomicCount.h"
+
 #include <mqueue.h>
 #include <string.h>
 #include <iostream>
@@ -29,139 +42,420 @@
 #include <queue>
 
 #include <boost/ptr_container/ptr_map.hpp>
-#include <boost/current_function.hpp>
-
-#include "qpid/QpidError.h"
-#include "qpid/sys/Monitor.h"
-#include "qpid/log/Statement.h"
-
-#include "check.h"
-#include "EventChannel.h"
+#include <boost/noncopyable.hpp>
+#include <boost/bind.hpp>
 
 using namespace std;
 
 
-// Convenience template to zero out a struct.
-template <class S> struct ZeroStruct : public S {
-    ZeroStruct() { memset(this, 0, sizeof(*this)); }
-};
-    
 namespace qpid {
 namespace sys {
 
 
+// ================================================================
+// Private class declarations
+
+namespace {
+
+typedef enum { IN, OUT } Direction;
+
+typedef std::pair<Event*, Event*> EventPair;
+
 /**
- * EventHandler wraps an epoll file descriptor. Acts as private
- * interface between EventChannel and subclasses.
- *
- * Also implements Event interface for events that are not associated
- * with a file descriptor and are passed via the message queue.
- */ 
-class EventHandler : public Event, private Monitor
+ * Template to zero out a C-struct on construction. Avoids uninitialized memory
+ * warnings from valgrind or other mem checking tool.
+ */
+template <class T> struct CleanStruct : public T {
+    CleanStruct() { memset(this, 0, sizeof(*this)); }
+};
+
+} // namespace
+
+/**
+ * Queue of events corresponding to one IO direction (IN or OUT).
+ * Each Descriptor contains two Queues.
+ */
+class EventChannel::Queue : private boost::noncopyable
 {
   public:
-    EventHandler(int epollSize = 256);
-    ~EventHandler();
+    Queue(Descriptor& container, Direction dir);
 
-    int getEpollFd() { return epollFd; }
-    void epollAdd(int fd, uint32_t epollEvents, Event* event);
-    void epollMod(int fd, uint32_t epollEvents, Event* event);
-    void epollDel(int fd);
+    /** Called by Event classes in prepare() */ 
+    void push(Event* e);
 
-    void mqPut(Event* event);
-    Event* mqGet();
-    
-  protected:
-    // Should never be called, only complete.
-    void prepare(EventHandler&) { assert(0); }
-    Event* complete(EventHandler& eh);
+    /** Called when epoll wakes.
+     *@return The next completed event or 0.
+     */
+    Event* wake(uint32_t epollFlags);
+
+    Event* pop() { Event* e = queue.front(); queue.pop_front(); return e; }
+
+    bool empty() { return queue.empty(); }
+
+    void setBit(uint32_t &epollFlags);
+
+    void shutdown();
     
   private:
+    typedef std::deque<Event*> EventQ; 
+
+    inline bool isMyEvent(uint32_t flags) { return flags | myEvent; }
+
+    Mutex& lock;                // Shared with Descriptor.
+    Descriptor& descriptor;
+    uint32_t myEvent;           // Epoll event flag.
+    EventQ queue;
+};
+
+
+/**
+ * Manages a file descriptor in an epoll set.
+ *
+ * Can be shutdown and re-activated for the same file descriptor.
+ */
+class EventChannel::Descriptor : private boost::noncopyable {
+  public:
+    explicit Descriptor(int fd) : epollFd(-1), myFd(fd),
+                   inQueue(*this, IN), outQueue(*this, OUT) {}
+
+    void activate(int epollFd_);
+
+    /** Epoll woke up for this descriptor. */
+    Event* wake(uint32_t epollEvents);
+
+    /** Shut down: close and remove file descriptor.
+     * May be re-activated if fd is reused.
+     */
+    void shutdown();
+
+    // TODO aconway 2006-12-18: Nasty. Need to clean up interaction.
+    void shutdownUnsafe();      
+
+    bool isShutdown() { return epollFd == -1; }
+
+    Queue& getQueue(Direction d) { return d==IN ? inQueue : outQueue; }
+    int getFD() const { return myFd; }
+
+  private:
+    void update();
+    void epollCtl(int op, uint32_t events);
+    Queue* pick();
+
+    Mutex lock;
     int epollFd;
-    std::string mqName;
-    int mqFd;
-    std::queue<Event*> mqEvents;
+    int myFd;
+    Queue inQueue, outQueue;
+    bool preferIn;
+
+  friend class Queue;
 };
 
-EventHandler::EventHandler(int epollSize)
-{
-    epollFd = epoll_create(epollSize);
-    if (epollFd < 0) throw QPID_POSIX_ERROR(errno);
+ 
+/**
+ * Holds a map of Descriptors, which do most of the work.
+ */
+class EventChannel::Impl {
+  public:
+    Impl(int size = 256);
 
-    // Create a POSIX message queue for non-fd events.
-    // We write one byte and never read it is always ready for read
-    // when we add it to epoll.
-    // 
-    ZeroStruct<struct mq_attr> attr;
-    attr.mq_maxmsg = 1;
-    attr.mq_msgsize = 1;
-    do {
-        char tmpnam[L_tmpnam];
-        tmpnam_r(tmpnam);
-        mqName = tmpnam + 4; // Skip "tmp/"
-        mqFd = mq_open(
-            mqName.c_str(), O_CREAT|O_EXCL|O_RDWR|O_NONBLOCK, S_IRWXU, &attr);
-        if (mqFd < 0) throw QPID_POSIX_ERROR(errno);
-    } while (mqFd == EEXIST);  // Name already taken, try again.
+    ~Impl();
 
-    static char zero = '\0';
-    mq_send(mqFd, &zero, 1, 0);
-    epollAdd(mqFd, 0, this);
+    /**
+     * Activate descriptor
+     */
+    void activate(Descriptor& d) {
+    	d.activate(epollFd);
+    }
+
+    /** Wait for an event, return 0 on timeout */
+    Event* wait(Duration timeout);
+
+    void shutdown();
+
+  private:
+
+    Monitor monitor;     
+    int epollFd;
+    int shutdownPipe[2];
+    AtomicCount nWaiters;
+    bool isShutdown;
+};
+
+
+// ================================================================
+// EventChannel::Queue::implementation.
+
+static const char* shutdownMsg = "Event queue shut down.";
+
+EventChannel::Queue::Queue(Descriptor& d, Direction dir) :
+    lock(d.lock), descriptor(d),
+    myEvent(dir==IN ? EPOLLIN : EPOLLOUT)
+{}
+
+void EventChannel::Queue::push(Event* e) {
+    Mutex::ScopedLock l(lock);
+    if (descriptor.isShutdown())
+        THROW_QPID_ERROR(INTERNAL_ERROR, shutdownMsg);
+    queue.push_back(e);
+    descriptor.update(); 
+}
+
+void EventChannel::Queue::setBit(uint32_t &epollFlags) {
+    if (queue.empty())
+        epollFlags &= ~myEvent;
+    else
+        epollFlags |= myEvent;
+}
+
+// TODO aconway 2006-12-20: REMOVE
+Event* EventChannel::Queue::wake(uint32_t epollFlags) {
+    // Called with lock held.
+    if (!queue.empty() && (isMyEvent(epollFlags))) {
+        assert(!queue.empty());
+        Event* e = queue.front();
+        assert(e);
+        if (!e->getException()) {
+            // TODO aconway 2006-12-20: Can/should we move event completion
+            // out into dispatch() so it doesn't happen in Descriptor locks?
+            e->complete(descriptor);
+        }
+        queue.pop_front();
+        return e;
+    }
+    return 0;
+}
+        
+void EventChannel::Queue::shutdown() {
+    // Mark all pending events with a shutdown exception.
+    // The server threads will remove and dispatch the events.
+    // 
+    qpid::QpidError ex(INTERNAL_ERROR, shutdownMsg, SRCLINE);
+    for_each(queue.begin(), queue.end(),
+             boost::bind(&Event::setException, _1, ex));
 }
 
-EventHandler::~EventHandler() {
-    mq_close(mqFd);
-    mq_unlink(mqName.c_str());
+
+// ================================================================
+// Descriptor
+
+
+void EventChannel::Descriptor::activate(int epollFd_) {
+    Mutex::ScopedLock l(lock);
+    if (isShutdown()) {
+        epollFd = epollFd_;     // We're back in business.
+        epollCtl(EPOLL_CTL_ADD, 0);
+    }
 }
 
-void EventHandler::mqPut(Event* event) {
-    ScopedLock l(*this);
-    assert(event != 0);
-    mqEvents.push(event);
-    epollMod(mqFd, EPOLLIN|EPOLLONESHOT, this);
+void EventChannel::Descriptor::shutdown() {
+    Mutex::ScopedLock l(lock);
+    shutdownUnsafe();
+}
+
+void EventChannel::Descriptor::shutdownUnsafe() {
+    // Caller holds lock.
+    ::close(myFd);
+    epollFd = -1;               // Mark myself as shutdown.
+    inQueue.shutdown();
+    outQueue.shutdown();
+}
+
+// TODO aconway 2006-12-20: Inline into wake().
+void EventChannel::Descriptor::update() {
+    // Caller holds lock.
+    if (isShutdown())             // Nothing to do
+        return;
+    uint32_t events =  EPOLLONESHOT | EPOLLERR | EPOLLHUP;
+    inQueue.setBit(events);
+    outQueue.setBit(events);
+    epollCtl(EPOLL_CTL_MOD, events);
 }
+    
+void EventChannel::Descriptor::epollCtl(int op, uint32_t events) {
+    // Caller holds lock
+    assert(!isShutdown());
+    CleanStruct<epoll_event> ee;
+    ee.data.ptr = this;
+    ee.events = events;
+    int status = ::epoll_ctl(epollFd, op, myFd, &ee);
+    if (status < 0) {
+    	if (errno == EEXIST) // It's okay to add an existing fd
+    		return;
+        else if (errno == EBADF)     // FD was closed externally.
+            shutdownUnsafe();
+        else
+            throw QPID_POSIX_ERROR(errno);
+    }
+}
+    
 
-Event* EventHandler::mqGet() {
-    ScopedLock l(*this);
-    if (mqEvents.empty()) 
+EventChannel::Queue* EventChannel::Descriptor::pick() {
+    if (inQueue.empty() && outQueue.empty()) 
         return 0;
-    Event* event = mqEvents.front();
-    mqEvents.pop();
-    if(!mqEvents.empty())
-        epollMod(mqFd, EPOLLIN|EPOLLONESHOT, this);
-    return event;
+    if (inQueue.empty() || outQueue.empty())
+        return !inQueue.empty() ? &inQueue : &outQueue;
+    // Neither is empty, pick fairly.
+    preferIn = !preferIn;
+    return preferIn ? &inQueue : &outQueue;
+}
+
+Event* EventChannel::Descriptor::wake(uint32_t epollEvents) {
+    Mutex::ScopedLock l(lock);
+    // On error, shut down the Descriptor and both queues.
+    if (epollEvents & (EPOLLERR | EPOLLHUP)) {
+        shutdownUnsafe();
+        // TODO aconway 2006-12-20: This error handling models means
+        // that any error reported by epoll will result in a shutdown
+        // exception on the events. Can we get more accurate error
+        // reporting somehow?
+    }
+    Queue*q = 0;
+    bool in = (epollEvents & EPOLLIN);
+    bool out = (epollEvents & EPOLLOUT);
+    if ((in && out) || isShutdown()) 
+        q = pick();         // Choose fairly, either non-empty queue.
+    else if (in) 
+        q = &inQueue;
+    else if (out) 
+        q = &outQueue;
+    Event* e = (q && !q->empty()) ? q->pop() : 0;
+    update();
+    if (e)
+        e->complete(*this);
+    return e;
 }
 
-void EventHandler::epollAdd(int fd, uint32_t epollEvents, Event* event)
+
+
+// ================================================================
+// EventChannel::Impl
+
+
+EventChannel::Impl::Impl(int epollSize):
+    epollFd(-1), isShutdown(false)
 {
-    ZeroStruct<struct epoll_event> ee;
-    ee.data.ptr = event;
-    ee.events = epollEvents;
-    if (epoll_ctl(epollFd, EPOLL_CTL_ADD, fd, &ee) < 0) 
-        throw QPID_POSIX_ERROR(errno);
+    // Create the epoll file descriptor.
+    epollFd = epoll_create(epollSize);
+    QPID_POSIX_CHECK(epollFd);
+
+    // Create a pipe and write a single byte.  The byte is never
+    // read so the pipes read fd is always ready for read.
+    // We activate the FD when there are messages in the queue.
+    QPID_POSIX_CHECK(::pipe(shutdownPipe));
+    static char zero = '\0';
+    QPID_POSIX_CHECK(::write(shutdownPipe[1], &zero, 1));
 }
 
-void EventHandler::epollMod(int fd, uint32_t epollEvents, Event* event)
-{
-    ZeroStruct<struct epoll_event> ee;
-    ee.data.ptr = event;
-    ee.events = epollEvents;
-    if (epoll_ctl(epollFd, EPOLL_CTL_MOD, fd, &ee) < 0) 
-        throw QPID_POSIX_ERROR(errno);
+EventChannel::Impl::~Impl() {
+    shutdown();
+    ::close(epollFd);
+    ::close(shutdownPipe[0]);
+    ::close(shutdownPipe[1]);
 }
 
-void EventHandler::epollDel(int fd) {
-    if (epoll_ctl(epollFd, EPOLL_CTL_DEL, fd, 0) < 0)
-        throw QPID_POSIX_ERROR(errno);
+
+void EventChannel::Impl::shutdown() {
+    Monitor::ScopedLock l(monitor);
+    if (!isShutdown) { // I'm starting shutdown.
+        isShutdown = true;
+        if (nWaiters == 0)
+            return;
+
+        // TODO aconway 2006-12-20: If I just close the epollFd will
+        // that wake all threads? If so with what? Would be simpler than:
+
+        CleanStruct<epoll_event> ee;
+        ee.data.ptr = 0;
+        ee.events = EPOLLIN;
+        QPID_POSIX_CHECK(
+            epoll_ctl(epollFd, EPOLL_CTL_ADD, shutdownPipe[0], &ee));
+    }
+    // Wait for nWaiters to get out.
+    while (nWaiters > 0) {
+        monitor.wait();
+    }
 }
 
-Event* EventHandler::complete(EventHandler& eh)
-{
-    assert(&eh == this);
-    Event* event =  mqGet();
-    return event==0 ? 0 : event->complete(eh);
+// TODO aconway 2006-12-20: DEBUG remove
+struct epoll {
+    epoll(uint32_t e) : events(e) { }
+    uint32_t events;
+};
+
+#define BIT(X) out << ((e.events & X) ? __STRING(X) "." : "")
+ostream& operator << (ostream& out, epoll e) {
+    out << "epoll_event.events: ";
+    BIT(EPOLLIN);
+    BIT(EPOLLPRI);
+    BIT(EPOLLOUT);
+    BIT(EPOLLRDNORM);
+    BIT(EPOLLRDBAND);
+    BIT(EPOLLWRNORM);
+    BIT(EPOLLWRBAND);
+    BIT(EPOLLMSG);
+    BIT(EPOLLERR);
+    BIT(EPOLLHUP);
+    BIT(EPOLLONESHOT);
+    BIT(EPOLLET);
+    return out;
 }
     
+    
+    
+/**
+ * Wait for epoll to wake up, return the descriptor or 0 on timeout.
+ */
+Event* EventChannel::Impl::wait(Duration timeoutNs)
+{
+    {
+        Monitor::ScopedLock l(monitor);
+        if (isShutdown)
+            throw ShutdownException();
+    }
+    
+    // Increase nWaiters for the duration, notify the monitor if I'm
+    // the last one out.
+    // 
+    AtomicCount::ScopedIncrement si(
+        nWaiters, boost::bind(&Monitor::notifyAll, &monitor));
+
+    // No lock, all thread safe calls or local variables:
+    // 
+    const long timeoutMs =
+        (timeoutNs == TIME_INFINITE) ? -1 : timeoutNs/TIME_MSEC;
+    CleanStruct<epoll_event> ee;
+    Event* event = 0;
+
+    // Loop till we get a completed event. Some events may repost
+    // themselves and return 0, e.g. incomplete read or write events.
+    //TODO aconway 2006-12-20: FIX THIS!
+    while (!event) {
+        int n = ::epoll_wait(epollFd, &ee, 1, timeoutMs); // Thread safe.
+        if (n == 0)             // Timeout
+            return 0;
+        if (n < 0 && errno == EINTR) // Interrupt, ignore it.
+            continue;
+        if (n < 0)              
+            throw QPID_POSIX_ERROR(errno);
+        assert(n == 1);
+        Descriptor* ed =
+            reinterpret_cast<Descriptor*>(ee.data.ptr);
+        if (ed == 0)            // We're being shut-down.
+            throw ShutdownException();
+        assert(ed != 0);
+        event = ed->wake(ee.events);
+    }
+    return event;
+}
+
+//EventChannel::Descriptor& EventChannel::Impl::getDescriptor(int fd) {
+//    Mutex::ScopedLock l(monitor);
+//    Descriptor& ed = descriptors[fd];
+//    ed.activate(epollFd, fd);
+//    return ed;
+//}
+
+
 // ================================================================
 // EventChannel
 
@@ -169,157 +463,134 @@
     return shared_ptr(new EventChannel());
 }
 
-EventChannel::EventChannel() : handler(new EventHandler()) {}
+EventChannel::EventChannel() : impl(new EventChannel::Impl()) {}
 
 EventChannel::~EventChannel() {}
 
-void EventChannel::postEvent(Event& e) 
+void EventChannel::post(Event& e) 
 {
-    e.prepare(*handler);
+    e.prepare(*impl);
 }
 
-Event* EventChannel::getEvent()
+Event* EventChannel::wait(Duration timeoutNs)
 {
-    static const int infiniteTimeout = -1; 
-    ZeroStruct<struct epoll_event> epollEvent;
+    return impl->wait(timeoutNs);
+}
 
-    // Loop until we can complete the event. Some events may re-post
-    // themselves and return 0 from complete, e.g. partial reads. // 
-    Event* event = 0;
-    while (event == 0) {
-        int eventCount = epoll_wait(handler->getEpollFd(),
-                                    &epollEvent, 1, infiniteTimeout);
-        if (eventCount < 0) {
-            if (errno != EINTR) {
-                QPID_LOG(warn, "Ignoring error: "
-                         << PosixError::getMessage(errno));
-                assert(0);
-            }
-        }
-        else if (eventCount == 1) {
-            event = reinterpret_cast<Event*>(epollEvent.data.ptr);
-            assert(event != 0);
-            try {
-                event = event->complete(*handler);
-            }
-            catch (const Exception& e) {
-                if (event)
-                    event->setError(e);
-            }
-            catch (const std::exception& e) {
-                if (event)
-                    event->setError(e);
-            }
-        }
-    }
-    return event;
+void EventChannel::shutdown() {
+    impl->shutdown();
 }
 
+
+// ================================================================
+// Event and subclasses.
+
 Event::~Event() {}
     
-void Event::prepare(EventHandler& handler)
-{
-    handler.mqPut(this);
+Exception::shared_ptr_const Event::getException() const {
+    return exception;
 }
 
-bool Event::hasError() const {
-    return error;
+void Event::throwIfException() {
+    if (getException())
+        exception->throwSelf();
 }
 
-void Event::throwIfError() throw (Exception) {
-    if (hasError())
-        error.throwSelf();
-}
-
-Event* Event::complete(EventHandler&)
+void Event::dispatch()
 {
-    return this;
+    if (!callback.empty())
+        callback();
 }
 
-void Event::dispatch()
-{
+void Event::setException(const std::exception& e) {
+    const Exception* ex = dynamic_cast<const Exception*>(&e);
+    if (ex) 
+        exception.reset(ex->clone().release());
+    else 
+        exception.reset(new Exception(e));
+#ifndef NDEBUG
+    // Throw and re-catch the exception. Has no effect on the
+    // program but it triggers debuggers watching for throw.  The
+    // context that sets the exception is more informative for
+    // debugging purposes than the one that ultimately throws it.
+    // 
     try {
-        if (!callback.empty())
-            callback();
-    } catch (const std::exception&) {
-        throw;
-    } catch (...) {
-        throw QPID_ERROR(INTERNAL_ERROR, "Unknown exception.");
+        throwIfException();
     }
+    catch (...) { }             // Ignored.
+#endif
 }
 
-void Event::setError(const ExceptionHolder& e) {
-    error = e;
+int FDEvent::getFDescriptor() const {
+	return descriptor.getFD();
 }
 
-void ReadEvent::prepare(EventHandler& handler)
-{
-    handler.epollAdd(descriptor, EPOLLIN | EPOLLONESHOT, this);
+// TODO: AMS 21/12/06 Don't like the inline new, probably cause a memory leak
+ReadEvent::ReadEvent(int fd, void* buf, size_t sz,Callback cb, bool noWait) :
+	IOEvent(cb, *(new EventChannel::Descriptor(fd)), sz, noWait), buffer(buf), bytesRead(0) {
 }
 
-ssize_t ReadEvent::doRead() {
-    ssize_t n = ::read(descriptor, static_cast<char*>(buffer) + received,
-                       size - received);
-    if (n > 0) received += n;
-    return n;
+void ReadEvent::prepare(EventChannel::Impl& impl) {
+	EventChannel::Descriptor& d = getDescriptor();
+	impl.activate(d);
+    d.getQueue(IN).push(this);
 }
 
-Event* ReadEvent::complete(EventHandler& handler)
+void ReadEvent::complete(EventChannel::Descriptor& ed)
 {
-    // Read as much as possible without blocking.
-    ssize_t n = doRead();
-    while (n > 0 && received < size) doRead();
-
-    if (received == size) {
-        handler.epollDel(descriptor);
-        received = 0;           // Reset for re-use.
-        return this;
-    }
-    else if (n <0 && (errno == EAGAIN)) {
-        // Keep polling for more.
-        handler.epollMod(descriptor, EPOLLIN | EPOLLONESHOT, this);
-        return 0;
-    }
-    else {
-        // Unexpected EOF or error. Throw ENODATA for EOF.
-        handler.epollDel(descriptor);
-        received = 0;           // Reset for re-use.
-        throw QPID_POSIX_ERROR((n < 0) ? errno : ENODATA);
+    ssize_t n = ::read(getFDescriptor(),
+                       static_cast<char*>(buffer) + bytesRead,
+                       size - bytesRead);
+    if (n > 0)
+        bytesRead += n;
+    if (n == 0 || (n < 0 && errno != EAGAIN)) {
+        // Use ENODATA for file closed.
+        setException(QPID_POSIX_ERROR(n == 0 ? ENODATA : errno));
+        ed.shutdownUnsafe(); 
     }
 }
 
-void WriteEvent::prepare(EventHandler& handler)
-{
-    handler.epollAdd(descriptor, EPOLLOUT | EPOLLONESHOT, this);
+WriteEvent::WriteEvent(int fd, const void* buf, size_t sz, Callback cb) :
+    IOEvent(cb, *(new EventChannel::Descriptor(fd)), sz, noWait), buffer(buf), bytesWritten(0) {
+}
+
+void WriteEvent::prepare(EventChannel::Impl& impl) {
+	EventChannel::Descriptor& d = getDescriptor();
+	impl.activate(d);
+    d.getQueue(OUT).push(this);
 }
 
-Event* WriteEvent::complete(EventHandler& handler)
+
+void WriteEvent::complete(EventChannel::Descriptor& ed)
 {
-    ssize_t n = write(descriptor, static_cast<const char*>(buffer) + written,
-                      size - written);
-    if (n < 0) throw QPID_POSIX_ERROR(errno);
-    written += n;
-    if(written < size) {
-        // Keep polling.
-        handler.epollMod(descriptor, EPOLLOUT | EPOLLONESHOT, this);
-        return 0;
+    ssize_t n = ::write(getFDescriptor(),
+                        static_cast<const char*>(buffer) + bytesWritten,
+                        size - bytesWritten);
+    if (n > 0)
+        bytesWritten += n;
+    if(n < 0 && errno != EAGAIN) {
+        setException(QPID_POSIX_ERROR(errno));
+        ed.shutdownUnsafe(); // Called with lock held.
     }
-    written = 0;                // Reset for re-use.
-    handler.epollDel(descriptor);
-    return this;
 }
 
-void AcceptEvent::prepare(EventHandler& handler)
-{
-    handler.epollAdd(descriptor, EPOLLIN | EPOLLONESHOT, this);
+AcceptEvent::AcceptEvent(int fd, Callback cb) :
+    FDEvent(cb, *(new EventChannel::Descriptor(fd))), accepted(0) {
+}
+
+void AcceptEvent::prepare(EventChannel::Impl& impl) {
+	EventChannel::Descriptor& d = getDescriptor();
+	impl.activate(d);
+    d.getQueue(IN).push(this);
 }
 
-Event* AcceptEvent::complete(EventHandler& handler)
+void AcceptEvent::complete(EventChannel::Descriptor& ed)
 {
-    handler.epollDel(descriptor);
-    accepted = ::accept(descriptor, 0, 0);
-    if (accepted < 0) throw QPID_POSIX_ERROR(errno);
-    return this;
+    accepted = ::accept(getFDescriptor(), 0, 0);
+    if (accepted < 0) {
+        setException(QPID_POSIX_ERROR(errno));
+        ed.shutdownUnsafe(); // Called with lock held.
+    }
 }
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannel.h?view=diff&rev=548337&r1=548336&r2=548337
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannel.h Mon Jun 18 05:11:32 2007
@@ -20,7 +20,10 @@
  */
 
 #include "qpid/SharedObject.h"
-#include "qpid/ExceptionHolder.h"
+#include "qpid/Exception.h"
+#include "qpid/sys/Monitor.h"
+#include "qpid/sys/Time.h"
+
 #include <boost/function.hpp>
 #include <memory>
 
@@ -28,11 +31,57 @@
 namespace sys {
 
 class Event;
-class EventHandler;
-class EventChannel;
+
+/**
+ * Channel to post and wait for events.
+ */
+class EventChannel : public qpid::SharedObject<EventChannel>
+{
+  public:
+    static shared_ptr create();
+
+    /** Exception throw from wait() if channel is shut down. */
+    class ShutdownException : public qpid::Exception {};
+
+    ~EventChannel();
+    
+    /** Post an event to the channel. */
+    void post(Event& event);
+
+    /**
+     * Wait for the next complete event, up to timeout.
+     *@return Pointer to event or 0 if timeout elapses.
+     *@exception ShutdownException if the channel is shut down.
+     */
+    Event* wait(Duration timeout = TIME_INFINITE);
+
+    /**
+     * Shut down the event channel.
+     * Blocks till all threads have exited wait()
+     */
+    void shutdown();
+
+
+    // Internal classes.
+    class Impl;
+    class Queue;
+    class Descriptor;
+    
+  private:
+
+    EventChannel();
+
+    Mutex lock;
+    boost::shared_ptr<Impl> impl;
+};
 
 /**
  * Base class for all Events.
+ * 
+ * Derived classes define events representing various async IO operations.
+ * When an event is complete, it is returned by the EventChannel to
+ * a thread calling wait. The thread will call Event::dispatch() to
+ * execute code associated with event completion.
  */
 class Event
 {
@@ -40,132 +89,121 @@
     /** Type for callback when event is dispatched */
     typedef boost::function0<void> Callback;
 
-    /**
-     * Create an event with optional callback.
-     * Instances of Event are sent directly through the channel.
-     * Derived classes define additional waiting behaviour.
-     *@param cb A callback functor that is invoked when dispatch() is called.
-     */
-    Event(Callback cb = 0) : callback(cb) {}
-
     virtual ~Event();
 
     /** Call the callback provided to the constructor, if any. */
     void dispatch();
 
-    /** True if there was an error processing this event */
-    bool hasError() const;
+    /**
+     *If there was an exception processing this Event, return it.
+     *@return 0 if there was no exception. 
+     */
+    qpid::Exception::shared_ptr_const getException() const;
+
+    /** If getException() throw the corresponding exception. */
+    void throwIfException();
 
-    /** If hasError() throw the corresponding exception. */
-    void throwIfError() throw(Exception);
+    /** Set the dispatch callback. */
+    void setCallback(Callback cb) { callback = cb; }
+
+    /** Set the exception. */
+    void setException(const std::exception& e);
 
   protected:
-    virtual void prepare(EventHandler&);
-    virtual Event* complete(EventHandler&);
-    void setError(const ExceptionHolder& e);
+    Event(Callback cb=0) : callback(cb) {}
+
+    virtual void prepare(EventChannel::Impl&) = 0;
+    virtual void complete(EventChannel::Descriptor&) = 0;
 
     Callback callback;
-    ExceptionHolder error;
+    Exception::shared_ptr_const exception;
 
   friend class EventChannel;
-  friend class EventHandler;
+  friend class EventChannel::Queue;
 };
 
-template <class BufT>
-class IOEvent : public Event {
+/** Base class for events related to a file descriptor */
+class FDEvent : public Event {
+  public:
+    EventChannel::Descriptor& getDescriptor() const { return descriptor; }
+    int getFDescriptor() const;
+
+  protected:
+    FDEvent(Callback cb, EventChannel::Descriptor& fd)
+        : Event(cb), descriptor(fd) {}
+    // TODO AMS: 1/6/07 I really don't think this is correct, but
+    // the descriptor is immutable
+    FDEvent& operator=(const FDEvent& rhs) { Event::operator=(rhs); return *this; }
+
+  private:
+    EventChannel::Descriptor& descriptor;
+};
+
+/** Base class for read or write events. */
+class IOEvent : public FDEvent {
   public:
-    void getDescriptor() const { return descriptor; }
     size_t getSize() const { return size; }
-    BufT getBuffer() const { return buffer; }
-  
+    
   protected:
-    IOEvent(int fd, Callback cb, size_t sz, BufT buf) :
-        Event(cb), descriptor(fd), buffer(buf), size(sz) {}
+    IOEvent(Callback cb, EventChannel::Descriptor& fd, size_t sz, bool noWait_) :
+        FDEvent(cb, fd), size(sz), noWait(noWait_) {}
 
-    int descriptor;
-    BufT buffer;
     size_t size;
+    bool noWait;
 };
 
 /** Asynchronous read event */
-class ReadEvent : public IOEvent<void*>
+class ReadEvent : public IOEvent
 {
   public:
-    explicit ReadEvent(int fd=-1, void* buf=0, size_t sz=0, Callback cb=0) :
-        IOEvent<void*>(fd, cb, sz, buf), received(0) {}
+    explicit ReadEvent(int fd, void* buf=0, size_t sz=0,Callback cb=0, bool noWait=false);
 
+    void* getBuffer() const { return buffer; }
+    size_t getBytesRead() const { return bytesRead; }
+    
   private:
-    void prepare(EventHandler&);
-    Event* complete(EventHandler&);
+    void prepare(EventChannel::Impl&);
+    void complete(EventChannel::Descriptor&);
     ssize_t doRead();
 
-    size_t received;
+    void* buffer;
+    size_t bytesRead;
 };
 
 /** Asynchronous write event */
-class WriteEvent : public IOEvent<const void*>
+class WriteEvent : public IOEvent
 {
   public:
-    explicit WriteEvent(int fd=-1, const void* buf=0, size_t sz=0,
-                        Callback cb=0) :
-        IOEvent<const void*>(fd, cb, sz, buf), written(0) {}
+    explicit WriteEvent(int fd, const void* buf=0, size_t sz=0, Callback cb=0);
 
-  protected:
-    void prepare(EventHandler&);
-    Event* complete(EventHandler&);
+    const void* getBuffer() const { return buffer; }
+    size_t getBytesWritten() const { return bytesWritten; }
 
   private:
+    void prepare(EventChannel::Impl&);
+    void complete(EventChannel::Descriptor&);
     ssize_t doWrite();
-    size_t written;
+
+    const void* buffer;
+    size_t bytesWritten;
 };
 
+
 /** Asynchronous socket accept event */
-class AcceptEvent : public Event
+class AcceptEvent : public FDEvent
 {
   public:
     /** Accept a connection on fd. */
-    explicit AcceptEvent(int fd=-1, Callback cb=0) :
-        Event(cb), descriptor(fd), accepted(0) {}
-
-    /** Get descriptor for server socket */
+    explicit AcceptEvent(int fd, Callback cb=0);
+    
+    /** Get descriptor for accepted server socket */
     int getAcceptedDesscriptor() const { return accepted; }
 
   private:
-    void prepare(EventHandler&);
-    Event* complete(EventHandler&);
+    void prepare(EventChannel::Impl&);
+    void complete(EventChannel::Descriptor&);
 
-    int descriptor;
     int accepted;
-};
-
-
-class QueueSet;
-
-/**
- * Channel to post and wait for events.
- */
-class EventChannel : public qpid::SharedObject<EventChannel>
-{
-  public:
-    static shared_ptr create();
-    
-    ~EventChannel();
-    
-    /** Post an event to the channel. */
-    void postEvent(Event& event);
-
-    /** Post an event to the channel. Must not be 0. */
-    void postEvent(Event* event) { postEvent(*event); }
-        
-    /**
-     * Wait for the next complete event.
-     *@return Pointer to event. Will never return 0.
-     */
-    Event* getEvent();
-
-  private:
-    EventChannel();
-    boost::shared_ptr<EventHandler> handler;
 };
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelAcceptor.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelAcceptor.cpp?view=diff&rev=548337&r1=548336&r2=548337
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelAcceptor.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelAcceptor.cpp Mon Jun 18 05:11:32 2007
@@ -106,7 +106,7 @@
         if (!isRunning && !isShutdown) {
             isRunning = true;
             factory = f;
-            threads->postEvent(acceptEvent);
+            threads->post(acceptEvent);
         }
     }
     threads->join();            // Wait for shutdown.
@@ -120,7 +120,7 @@
         isShutdown = true;
     }
     if (doShutdown) {
-        ::close(acceptEvent.getDescriptor());
+        ::close(acceptEvent.getFDescriptor());
         threads->shutdown();
         for_each(connections.begin(), connections.end(),
                  boost::bind(&EventChannelConnection::close, _1));
@@ -139,11 +139,11 @@
         shutdown();
         return;
     }
-    // TODO aconway 2006-11-29: Need to reap closed connections also.
     int fd = acceptEvent.getAcceptedDesscriptor();
+    threads->post(acceptEvent); // Keep accepting.
+    // TODO aconway 2006-11-29: Need to reap closed connections also.
     connections.push_back(
         new EventChannelConnection(threads, *factory, fd, fd, isTrace));
-    threads->postEvent(acceptEvent); // Keep accepting.
 }
 
 }} // namespace qpid::sys

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelConnection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelConnection.cpp?view=diff&rev=548337&r1=548336&r2=548337
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelConnection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelConnection.cpp Mon Jun 18 05:11:32 2007
@@ -24,7 +24,6 @@
 #include "EventChannelConnection.h"
 #include "qpid/sys/ConnectionInputHandlerFactory.h"
 #include "qpid/QpidError.h"
-#include "qpid/log/Statement.h"
 
 using namespace std;
 using namespace qpid;
@@ -44,6 +43,8 @@
 ) :
     readFd(rfd),
     writeFd(wfd ? wfd : rfd),
+    readEvent(readFd),
+    writeEvent(writeFd),
     readCallback(boost::bind(&EventChannelConnection::closeOnException,
                              this, &EventChannelConnection::endInitRead)),
 
@@ -55,8 +56,8 @@
     out(bufferSize),
     isTrace(isTrace_)
 {
-    BOOST_ASSERT(readFd > 0);
-    BOOST_ASSERT(writeFd > 0);
+    assert(readFd > 0);
+    assert(writeFd > 0);
     closeOnException(&EventChannelConnection::startRead);
 }
 
@@ -133,14 +134,17 @@
     }
     // No need to lock here - only one thread can be writing at a time.
     out.clear();
-    QPID_LOG(trace, "Send on socket " << writeFd << ": " << *frame);
+    if (isTrace)
+        cout << "Send on socket " << writeFd << ": " << *frame << endl;
     frame->encode(out);
     out.flip();
+    // TODO: AMS 1/6/07 This only works because we already have the correct fd
+    // in the descriptor - change not to use assigment
     writeEvent = WriteEvent(
         writeFd, out.start(), out.available(),
         boost::bind(&EventChannelConnection::closeOnException,
                     this, &EventChannelConnection::endWrite));
-    threads->postEvent(writeEvent);
+    threads->post(writeEvent);
 }
 
 // ScopedBusy ctor increments busyThreads.
@@ -161,12 +165,18 @@
     ScopedBusy(*this);
     {
         Monitor::ScopedLock lock(monitor);
+        assert(isWriting);
         isWriting = false;
-        if (isClosed)
+        if (isClosed) 
             return;
         writeEvent.throwIfException();
+        if (writeEvent.getBytesWritten() < writeEvent.getSize()) {
+            // Keep writing the current event till done.
+            isWriting = true;
+            threads->post(writeEvent);
+        }
     }
-    // Check if there's more in to write in the write queue.
+    // Continue writing from writeFrames queue.
     startWrite();
 }
     
@@ -179,8 +189,8 @@
 void EventChannelConnection::startRead() {
     // Non blocking read, as much as we can swallow.
     readEvent = ReadEvent(
-        readFd, in.start(), in.available(), readCallback,true);
-    threads->postEvent(readEvent);
+        readFd, in.start(), in.available(), readCallback);
+    threads->post(readEvent);
 }
 
 // Completion of initial read, expect protocolInit.
@@ -194,7 +204,7 @@
         in.flip();
         ProtocolInitiation protocolInit;
         if(protocolInit.decode(in)){
-            handler->initiated(&protocolInit);
+            handler->initiated(protocolInit);
             readCallback = boost::bind(
                 &EventChannelConnection::closeOnException,
                 this, &EventChannelConnection::endRead);
@@ -215,8 +225,10 @@
         in.flip();
         AMQFrame frame;
         while (frame.decode(in)) {
-            QPID_LOG(trace, "Received on socket " << readFd
-                     << ": " << frame);
+            // TODO aconway 2006-11-30: received should take Frame&
+            if (isTrace)
+                cout << "Received on socket " << readFd
+                     << ": " << frame << endl;
             handler->received(&frame); 
         }
         in.compact();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelConnection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelConnection.h?view=diff&rev=548337&r1=548336&r2=548337
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelConnection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelConnection.h Mon Jun 18 05:11:32 2007
@@ -34,7 +34,7 @@
 class ConnectionInputHandlerFactory;
 
 /**
- * Implements ConnectionOutputHandler and delegates to a ConnectionInputHandler
+ * Implements SessionContext and delegates to a SessionHandler
  * for a connection via the EventChannel.
  *@param readDescriptor file descriptor for reading.
  *@param writeDescriptor file descriptor for writing,
@@ -50,7 +50,7 @@
         bool isTrace = false
     );
 
-    // TODO aconway 2006-11-30: ConnectionOutputHandler::send should take auto_ptr
+    // TODO aconway 2006-11-30: SessionContext::send should take auto_ptr
     virtual void send(qpid::framing::AMQFrame* frame) {
         send(std::auto_ptr<qpid::framing::AMQFrame>(frame));
     }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelThreads.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelThreads.cpp?view=diff&rev=548337&r1=548336&r2=548337
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelThreads.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelThreads.cpp Mon Jun 18 05:11:32 2007
@@ -16,27 +16,40 @@
  *
  */
 
-#include "EventChannelThreads.h"
-#include "qpid/sys/Runnable.h"
-#include "qpid/log/Statement.h"
 #include <iostream>
-using namespace std;
+#include <limits>
+
 #include <boost/bind.hpp>
 
+#include "qpid/sys/Runnable.h"
+
+#include "EventChannelThreads.h"
+
 namespace qpid {
 namespace sys {
 
+const size_t EventChannelThreads::unlimited =
+    std::numeric_limits<size_t>::max();
+
 EventChannelThreads::shared_ptr EventChannelThreads::create(
-    EventChannel::shared_ptr ec)
+    EventChannel::shared_ptr ec, size_t min, size_t max
+)
 {
-    return EventChannelThreads::shared_ptr(new EventChannelThreads(ec));
+    return EventChannelThreads::shared_ptr(
+        new EventChannelThreads(ec, min, max));
 }
 
-EventChannelThreads::EventChannelThreads(EventChannel::shared_ptr ec) :
-    channel(ec), nWaiting(0), state(RUNNING)
+EventChannelThreads::EventChannelThreads(
+    EventChannel::shared_ptr ec, size_t min, size_t max) :
+    minThreads(std::max(size_t(1), min)),
+    maxThreads(std::min(min, max)),
+    channel(ec),
+    nWaiting(0),
+    state(RUNNING)
 {
-    // TODO aconway 2006-11-15: Estimate initial threads based on CPUs.
-    addThread();
+    Monitor::ScopedLock l(monitor);
+    while (workers.size() < minThreads) 
+        workers.push_back(Thread(*this));
 }
 
 EventChannelThreads::~EventChannelThreads() {
@@ -46,32 +59,30 @@
 
 void EventChannelThreads::shutdown() 
 {
-    ScopedLock lock(*this);
+    Monitor::ScopedLock lock(monitor);
     if (state != RUNNING)       // Already shutting down.
         return;
-    for (size_t i = 0; i < workers.size(); ++i) {
-        channel->postEvent(terminate);
-    }
-    state = TERMINATE_SENT;
-    notify();                // Wake up one join() thread.
+    state = TERMINATING;
+    channel->shutdown();
+    monitor.notify();           // Wake up one join() thread.
 }
 
 void EventChannelThreads::join() 
 {
     {
-        ScopedLock lock(*this);
+        Monitor::ScopedLock lock(monitor);
         while (state == RUNNING)    // Wait for shutdown to start.
-            wait();
+            monitor.wait();
         if (state == SHUTDOWN)      // Shutdown is complete
             return;
         if (state == JOINING) {
             // Someone else is doing the join.
             while (state != SHUTDOWN)
-                wait();
+                monitor.wait();
             return;
         }
         // I'm the  joining thread
-        assert(state == TERMINATE_SENT); 
+        assert(state == TERMINATING); 
         state = JOINING; 
     } // Drop the lock.
 
@@ -80,12 +91,13 @@
         workers[i].join();
     }
     state = SHUTDOWN;
-    notifyAll();                // Notify other join() threaeds.
+    monitor.notifyAll();        // Notify any other join() threads.
 }
 
 void EventChannelThreads::addThread() {
-    ScopedLock l(*this);
-    workers.push_back(Thread(*this));
+    Monitor::ScopedLock l(monitor);
+    if (workers.size() < maxThreads)
+        workers.push_back(Thread(*this));
 }
 
 void EventChannelThreads::run()
@@ -94,23 +106,20 @@
     AtomicCount::ScopedIncrement inc(nWaiting);
     try {
         while (true) {
-            Event* e = channel->getEvent(); 
+            Event* e = channel->wait(); 
             assert(e != 0);
-            if (e == &terminate) {
-                return;
-            }
             AtomicCount::ScopedDecrement dec(nWaiting);
-            // I'm no longer waiting, make sure someone is.
-            if (dec == 0)
+            // Make sure there's at least one waiting thread.
+            if (dec == 0 && state == RUNNING)
                 addThread();
             e->dispatch();
         }
     }
-    catch (const std::exception& e) {
-        QPID_LOG(error, e.what());
+    catch (const EventChannel::ShutdownException& e) {
+        return;
     }
-    catch (...) {
-        QPID_LOG(error, "unknown exception");
+    catch (const std::exception& e) {
+        Exception::log(e, "Exception in EventChannelThreads::run()");
     }
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelThreads.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelThreads.h?view=diff&rev=548337&r1=548336&r2=548337
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelThreads.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelThreads.h Mon Jun 18 05:11:32 2007
@@ -18,14 +18,16 @@
  * limitations under the License.
  *
  */
-#include <vector>
+#include "EventChannel.h"
 
 #include "qpid/Exception.h"
-#include "qpid/sys/Time.h"
+#include "qpid/sys/AtomicCount.h"
 #include "qpid/sys/Monitor.h"
 #include "qpid/sys/Thread.h"
-#include "qpid/sys/AtomicCount.h"
-#include "EventChannel.h"
+#include "qpid/sys/Time.h"
+#include "qpid/sys/Runnable.h"
+
+#include <vector>
 
 namespace qpid {
 namespace sys {
@@ -33,26 +35,33 @@
 /**
    Dynamic thread pool serving an EventChannel.
 
-   Threads run a loop { e = getEvent(); e->dispatch(); }
+   Threads run a loop { e = wait(); e->dispatch(); }
    The size of the thread pool is automatically adjusted to optimal size.
 */
 class EventChannelThreads :
         public qpid::SharedObject<EventChannelThreads>,
-        public sys::Monitor, private sys::Runnable
+        private sys::Runnable
 {
   public:
-    /** Create the thread pool and start initial threads. */
+    /** Constant to represent an unlimited number of threads */ 
+    static const size_t unlimited;
+    
+    /**
+     * Create the thread pool and start initial threads.
+     * @param minThreads Pool will initialy contain minThreads threads and
+     * will never shrink to less until shutdown.
+     * @param maxThreads Pool will never grow to more than maxThreads. 
+     */
     static EventChannelThreads::shared_ptr create(
-        EventChannel::shared_ptr channel
+        EventChannel::shared_ptr channel = EventChannel::create(),
+        size_t minThreads = 1,
+        size_t maxThreads = unlimited
     );
 
     ~EventChannelThreads();
 
     /** Post event to the underlying channel */
-    void postEvent(Event& event) { channel->postEvent(event); }
-
-    /** Post event to the underlying channel Must not be 0. */
-    void postEvent(Event* event) { channel->postEvent(event); }
+    void post(Event& event) { channel->post(event); }
 
     /**
      * Terminate all threads.
@@ -68,21 +77,25 @@
   private:
     typedef std::vector<sys::Thread> Threads;
     typedef enum {
-        RUNNING, TERMINATE_SENT, JOINING, SHUTDOWN
+        RUNNING, TERMINATING, JOINING, SHUTDOWN
     } State;
 
-    EventChannelThreads(EventChannel::shared_ptr underlyingChannel);
+    EventChannelThreads(
+        EventChannel::shared_ptr channel, size_t min, size_t max);
+    
     void addThread();
 
     void run();
     bool keepRunning();
     void adjustThreads();
 
+    Monitor monitor;
+    size_t minThreads;
+    size_t maxThreads;
     EventChannel::shared_ptr channel;
     Threads workers;
     sys::AtomicCount nWaiting;
     State state;
-    Event terminate;
 };
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/check.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/check.h?view=diff&rev=548337&r1=548336&r2=548337
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/check.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/check.h Mon Jun 18 05:11:32 2007
@@ -43,7 +43,7 @@
     
     int getErrNo() { return errNo; }
 
-    Exception* clone() const throw() { return new PosixError(*this); }
+    Exception::auto_ptr clone() const throw() { return Exception::auto_ptr(new PosixError(*this)); }
         
     void throwSelf() const { throw *this; }
 
@@ -55,6 +55,10 @@
 
 /** Create a PosixError for the current file/line and errno. */
 #define QPID_POSIX_ERROR(errNo) ::qpid::sys::PosixError(errNo, SRCLINE)
+
+/** Throw QPID_POSIX_ERROR(errno) if RESULT is less than zero */
+#define QPID_POSIX_CHECK(RESULT)                        \
+    if ((RESULT) < 0) throw QPID_POSIX_ERROR((errno))
 
 /** Throw a posix error if errNo is non-zero */
 #define QPID_POSIX_THROW_IF(ERRNO)              \

Added: incubator/qpid/trunk/qpid/cpp/src/tests/DispatcherTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/DispatcherTest.cpp?view=auto&rev=548337
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/DispatcherTest.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/DispatcherTest.cpp Mon Jun 18 05:11:32 2007
@@ -0,0 +1,128 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Poller.h"
+#include "qpid/sys/Dispatcher.h"
+#include "qpid/sys/Thread.h"
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <errno.h>
+
+#include <iostream>
+#include <boost/bind.hpp>
+
+using namespace std;
+using namespace qpid::sys;
+
+int writeALot(int fd, const string& s) {
+    int bytesWritten = 0;
+    do {
+        errno = 0;
+        int lastWrite = ::write(fd, s.c_str(), s.size());
+        if ( lastWrite >= 0) {
+            bytesWritten += lastWrite;
+        } 
+    } while (errno != EAGAIN);
+    return bytesWritten;
+}
+
+int readALot(int fd) {
+    int bytesRead = 0;
+    char buf[10240];
+    
+    do {
+        errno = 0;
+        int lastRead = ::read(fd, buf, sizeof(buf));
+        if ( lastRead >= 0) {
+            bytesRead += lastRead;
+        } 
+    } while (errno != EAGAIN);
+    return bytesRead;
+}
+
+int64_t writtenBytes = 0;
+int64_t readBytes = 0;
+
+void writer(DispatchHandle& h, int fd, const string& s) {
+    writtenBytes += writeALot(fd, s);
+    h.rewatch();
+}
+
+void reader(DispatchHandle& h, int fd) {
+    readBytes += readALot(fd);
+    h.rewatch();
+}
+
+int main(int argc, char** argv)
+{
+    // Create poller
+    Poller::shared_ptr poller(new Poller);
+    
+    // Create dispatcher thread
+    Dispatcher d(poller);
+    Dispatcher d1(poller);
+    //Dispatcher d2(poller);
+    //Dispatcher d3(poller);
+    Thread dt(d);
+    Thread dt1(d1);
+    //Thread dt2(d2);
+    //Thread dt3(d3);
+
+    // Setup sender and receiver
+    int sv[2];
+    int rc = ::socketpair(AF_LOCAL, SOCK_STREAM, 0, sv);
+    assert(rc >= 0);
+    
+    // Set non-blocking
+    rc = ::fcntl(sv[0], F_SETFL, O_NONBLOCK);
+    assert(rc >= 0);
+
+    rc = ::fcntl(sv[1], F_SETFL, O_NONBLOCK);
+    assert(rc >= 0);
+    
+    // Make up a large string
+    string testString = "This is only a test ... 1,2,3,4,5,6,7,8,9,10;";
+    for (int i = 0; i < 8; i++)
+        testString += testString;
+
+    DispatchHandle rh(sv[0], boost::bind(reader, _1, sv[0]), 0);
+    DispatchHandle wh(sv[1], 0, boost::bind(writer, _1, sv[1], testString));    
+
+    rh.watch(poller);
+    wh.watch(poller);
+
+    // wait 2 minutes then shutdown
+    sleep(60);
+    
+    poller->shutdown();
+    dt.join();
+    dt1.join();
+    //dt2.join();
+    //dt3.join();
+
+    cout << "Wrote: " << writtenBytes << "\n";
+    cout << "Read: " << readBytes << "\n";
+    
+    return 0;
+}

Added: incubator/qpid/trunk/qpid/cpp/src/tests/PollerTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/PollerTest.cpp?view=auto&rev=548337
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/PollerTest.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/PollerTest.cpp Mon Jun 18 05:11:32 2007
@@ -0,0 +1,164 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * Use socketpair to test the poller
+ */
+
+#include "qpid/sys/Poller.h"
+
+#include <string>
+#include <iostream>
+#include <memory>
+#include <exception>
+
+#include <assert.h>
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <errno.h>
+
+using namespace std;
+using namespace qpid::sys;
+
+int writeALot(int fd, const string& s) {
+    int bytesWritten = 0;
+    do {
+        errno = 0;
+        int lastWrite = ::write(fd, s.c_str(), s.size());
+        if ( lastWrite >= 0) {
+            bytesWritten += lastWrite;
+        } 
+    } while (errno != EAGAIN);
+    return bytesWritten;
+}
+
+int readALot(int fd) {
+    int bytesRead = 0;
+    char buf[1024];
+    
+    do {
+        errno = 0;
+        int lastRead = ::read(fd, buf, sizeof(buf));
+        if ( lastRead >= 0) {
+            bytesRead += lastRead;
+        } 
+    } while (errno != EAGAIN);
+    return bytesRead;
+}
+
+int main(int argc, char** argv)
+{
+    try 
+    {
+        int sv[2];
+        int rc = ::socketpair(AF_LOCAL, SOCK_STREAM, 0, sv);
+        assert(rc >= 0);
+        
+        // Set non-blocking
+        rc = ::fcntl(sv[0], F_SETFL, O_NONBLOCK);
+        assert(rc >= 0);
+
+        rc = ::fcntl(sv[1], F_SETFL, O_NONBLOCK);
+        assert(rc >= 0);
+        
+        // Make up a large string
+        string testString = "This is only a test ... 1,2,3,4,5,6,7,8,9,10;";
+        for (int i = 0; i < 6; i++)
+            testString += testString;
+
+        // Read as much as we can from socket 0
+        int bytesRead = readALot(sv[0]);
+        assert(bytesRead == 0);
+        cout << "Read(0): " << bytesRead << " bytes\n";
+
+        // Write as much as we can to socket 0
+        int bytesWritten = writeALot(sv[0], testString);
+        cout << "Wrote(0): " << bytesWritten << " bytes\n";
+        
+        // Read as much as we can from socket 1
+        bytesRead = readALot(sv[1]);
+        assert(bytesRead == bytesWritten);
+        cout << "Read(1): " << bytesRead << " bytes\n";
+
+        auto_ptr<Poller> poller(new Poller);
+        
+        PollerHandle h0(sv[0]);
+        PollerHandle h1(sv[1]);
+        
+        poller->addFd(h0, Poller::INOUT);
+        
+        // Wait for 500ms - h0 should be writable
+        Poller::Event event = poller->wait();
+        assert(event.handle == &h0);
+        assert(event.dir == Poller::OUT);
+        
+        // Write as much as we can to socket 0
+        bytesWritten = writeALot(sv[0], testString);
+        cout << "Wrote(0): " << bytesWritten << " bytes\n";
+        
+        // Wait for 500ms - h0 no longer writable
+        poller->rearmFd(h0);
+        event = poller->wait(500000000);
+        assert(event.handle == 0);
+
+        // Test we can read it all now
+        poller->addFd(h1, Poller::INOUT);
+        event = poller->wait();
+        assert(event.handle == &h1);
+        assert(event.dir == Poller::INOUT);
+        
+        bytesRead = readALot(sv[1]);
+        assert(bytesRead == bytesWritten);
+        cout << "Read(1): " << bytesRead << " bytes\n";
+        
+        // At this point h1 should have been disabled from the poller
+        // (as it was just returned) and h0 can write again
+        event = poller->wait();
+        assert(event.handle == &h0);
+        assert(event.dir == Poller::OUT);    
+
+        // Now both the handles should be disabled
+        event = poller->wait(500000000);
+        assert(event.handle == 0);
+        
+        // Test shutdown
+        poller->shutdown();
+        event = poller->wait();
+        assert(event.handle == 0);
+        assert(event.dir == Poller::SHUTDOWN);
+
+        event = poller->wait();
+        assert(event.handle == 0);
+        assert(event.dir == Poller::SHUTDOWN);
+
+        poller->delFd(h1);
+        poller->delFd(h0);
+        
+        return 0;
+    } catch (exception& e) {
+        cout << "Caught exception  " << e.what() << "\n";
+    }
+}
+
+