You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by mt...@apache.org on 2009/03/10 18:30:07 UTC

svn commit: r752174 - /qpid/trunk/qpid/cpp/src/qpid/sys/solaris/ECFPoller.cpp

Author: mteira
Date: Tue Mar 10 17:30:06 2009
New Revision: 752174

URL: http://svn.apache.org/viewvc?rev=752174&view=rev
Log:
Changes in Solaris ECF based poller, to meet the new Poller architecture

Modified:
    qpid/trunk/qpid/cpp/src/qpid/sys/solaris/ECFPoller.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/solaris/ECFPoller.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/solaris/ECFPoller.cpp?rev=752174&r1=752173&r2=752174&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/solaris/ECFPoller.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/solaris/ECFPoller.cpp Tue Mar 10 17:30:06 2009
@@ -30,9 +30,11 @@
 #include <port.h>
 #include <poll.h>
 #include <errno.h>
+#include <pthread.h>
+#include <signal.h>
 
 #include <assert.h>
-#include <vector>
+#include <queue>
 #include <exception>
 
 
@@ -43,11 +45,11 @@
 namespace sys {
 
 // Deletion manager to handle deferring deletion of PollerHandles to when they definitely aren't being used 
-DeletionManager<PollerHandle> PollerHandleDeletionManager;
+DeletionManager<PollerHandlePrivate> PollerHandleDeletionManager;
 
 //  Instantiate (and define) class static for DeletionManager
 template <>
-DeletionManager<PollerHandle>::AllThreadsStatuses DeletionManager<PollerHandle>::allThreadsStatuses(0);
+DeletionManager<PollerHandlePrivate>::AllThreadsStatuses DeletionManager<PollerHandlePrivate>::allThreadsStatuses(0);
 
 class PollerHandlePrivate {
     friend class Poller;
@@ -58,7 +60,8 @@
         MONITORED,
         INACTIVE,
         HUNGUP,
-        MONITORED_HUNGUP
+        MONITORED_HUNGUP,
+        DELETED
     };
 
     int fd;
@@ -104,6 +107,14 @@
         assert(stat == MONITORED);
         stat = HUNGUP;
     }
+
+    bool isDeleted() const {
+        return stat == DELETED;
+    }
+
+    void setDeleted() {
+        stat = DELETED;
+    }
 };
 
 PollerHandle::PollerHandle(const IOHandle& h) :
@@ -111,11 +122,16 @@
 {}
 
 PollerHandle::~PollerHandle() {
-    delete impl;
-}
-
-void PollerHandle::deferDelete() {
-    PollerHandleDeletionManager.markForDeletion(this);
+    {
+    ScopedLock<Mutex> l(impl->lock);
+    if (impl->isDeleted()) {
+    	return;
+    }
+    if (impl->isActive()) {
+        impl->setDeleted();
+    }
+    }
+    PollerHandleDeletionManager.markForDeletion(impl);
 }
 
 /**
@@ -125,35 +141,80 @@
 class PollerPrivate {
     friend class Poller;
 
-    const int portId;
+    class InterruptHandle: public PollerHandle {
+        std::queue<PollerHandle*> handles;
+
+        void processEvent(Poller::EventType) {
+            PollerHandle* handle = handles.front();
+            handles.pop();
+            assert(handle);
+
+            //Synthesise event
+            Poller::Event event(handle, Poller::INTERRUPTED);
+
+            //Process synthesised event
+            event.process();
+        }
 
+    public:
+        InterruptHandle() : PollerHandle(DummyIOHandle) {}
+
+        void addHandle(PollerHandle& h) {
+            handles.push(&h);
+        }
+
+        PollerHandle *getHandle() {
+            PollerHandle* handle = handles.front();
+            handles.pop();
+            return handle;
+        }
+
+        bool queuedHandles() {
+            return handles.size() > 0;
+        }
+    };
+
+    const int portId;
+    bool isShutdown;
+    InterruptHandle interruptHandle;
+    
     static uint32_t directionToPollEvent(Poller::Direction dir) {
         switch (dir) {
-            case Poller::INPUT:  return POLLIN;
-            case Poller::OUTPUT: return POLLOUT;
-            case Poller::INOUT:  return POLLIN | POLLOUT;
-            default: return 0;
+        case Poller::INPUT:  return POLLIN;
+        case Poller::OUTPUT: return POLLOUT;
+        case Poller::INOUT:  return POLLIN | POLLOUT;
+        default: return 0;
         }
     }
 
     static Poller::EventType pollToDirection(uint32_t events) {
         uint32_t e = events & (POLLIN | POLLOUT);
         switch (e) {
-            case POLLIN: return Poller::READABLE;
-            case POLLOUT: return Poller::WRITABLE;
-            case POLLIN | POLLOUT: return Poller::READ_WRITABLE;
-            default:
-                return (events & (POLLHUP | POLLERR)) ?
-                    Poller::DISCONNECTED : Poller::INVALID;
+        case POLLIN: return Poller::READABLE;
+        case POLLOUT: return Poller::WRITABLE;
+        case POLLIN | POLLOUT: return Poller::READ_WRITABLE;
+        default:
+            return (events & (POLLHUP | POLLERR)) ?
+                Poller::DISCONNECTED : Poller::INVALID;
         }
     }
-  
+        
     PollerPrivate() :
-        portId(::port_create()) {
+        portId(::port_create()),
+        isShutdown(false) {
+        QPID_POSIX_CHECK(portId);
     }
 
     ~PollerPrivate() {
     }
+
+    void interrupt() {
+        //Send an Alarm to the port
+        //We need to send a nonzero event mask, using POLLHUP,
+        //nevertheless the wait method will only look for a PORT_ALERT_SET
+        QPID_POSIX_CHECK(::port_alert(portId, PORT_ALERT_SET, POLLHUP,
+                                      &static_cast<PollerHandle&>(interruptHandle)));        
+    }
 };
 
 void Poller::addFd(PollerHandle& handle, Direction dir) {
@@ -177,7 +238,6 @@
     QPID_LOG(trace, "Poller::addFd(handle=" << &handle
              << "[" << typeid(&handle).name()
              << "], fd=" << eh.fd << ")");
-    //assert(dynamic_cast<DispatchHandle*>(&handle));
 }
 
 void Poller::delFd(PollerHandle& handle) {
@@ -223,17 +283,56 @@
 }
 
 void Poller::shutdown() {
-    //Send an Alarm to the port
-    //We need to send a nonzero event mask, using POLLHUP, but
-    //The wait method will only look for a PORT_ALERT_SET
-    QPID_POSIX_CHECK(::port_alert(impl->portId, PORT_ALERT_SET, POLLHUP, NULL));
-    QPID_LOG(trace, "Poller::shutdown");
+    //Allow sloppy code to shut us down more than once
+    if (impl->isShutdown)
+        return;
+
+    impl->isShutdown = true;
+    impl->interrupt();
+}
+
+bool Poller::interrupt(PollerHandle& handle) {
+    PollerPrivate::InterruptHandle& ih = impl->interruptHandle;
+    PollerHandlePrivate& eh = *static_cast<PollerHandle&>(ih).impl;
+    ScopedLock<Mutex> l(eh.lock);
+    ih.addHandle(handle);
+    impl->interrupt();
+    eh.setActive();
+    return true;
+}
+ 
+void Poller::run() {
+    // Make sure we can't be interrupted by signals at a bad time
+    ::sigset_t ss;
+    ::sigfillset(&ss);
+    ::pthread_sigmask(SIG_SETMASK, &ss, 0);
+
+    do {
+        Event event = wait();
+
+        // If can read/write then dispatch appropriate callbacks        
+        if (event.handle) {
+            event.process();
+        } else {
+            // Handle shutdown
+            switch (event.type) {
+            case SHUTDOWN:
+                return;
+            default:
+                // This should be impossible
+                assert(false);
+            }
+        }
+    } while (true);
 }
 
 Poller::Event Poller::wait(Duration timeout) {
     timespec_t tout;
     timespec_t* ptout = NULL;
     port_event_t pe;
+
+    AbsTime targetTimeout = (timeout == TIME_INFINITE) ? FAR_FUTURE :
+        AbsTime(now(), timeout);
     
     if (timeout != TIME_INFINITE) {
       tout.tv_sec = 0;
@@ -246,9 +345,15 @@
         QPID_LOG(trace, "About to enter port_get. Thread "
                  << pthread_self()
                  << ", timeout=" << timeout);
-        
+
+
         int rc = ::port_get(impl->portId, &pe, ptout);
 
+        if (impl->isShutdown) {
+            PollerHandleDeletionManager.markAllUnusedInThisThread();
+            return Event(0, SHUTDOWN);
+        }
+
         if (rc < 0) {
             switch (errno) {
             case EINTR:
@@ -259,33 +364,60 @@
                 QPID_POSIX_CHECK(rc);
             }
         } else {
-            //We use alert mode to notify the shutdown of the Poller
-            if (pe.portev_source == PORT_SOURCE_ALERT) {
-                return Event(0, SHUTDOWN);            
-            }
-            if (pe.portev_source == PORT_SOURCE_FD) {
-                PollerHandle *handle = static_cast<PollerHandle*>(pe.portev_user);
-                PollerHandlePrivate& eh = *handle->impl;
-                ScopedLock<Mutex> l(eh.lock);
-                QPID_LOG(trace, "About to send handle: " << handle);
+            PollerHandle* handle = static_cast<PollerHandle*>(pe.portev_user);
+            PollerHandlePrivate& eh = *handle->impl;
+            ScopedLock<Mutex> l(eh.lock);
+
+            if (eh.isActive()) {
+                //We use alert mode to notify interrupts
+                if (pe.portev_source == PORT_SOURCE_ALERT &&
+                    handle == &impl->interruptHandle) {
+                    PollerHandle* wrappedHandle = impl->interruptHandle.getHandle();
+
+                    if (impl->interruptHandle.queuedHandles()) {
+                        impl->interrupt();
+                        eh.setActive();
+                    } else {
+                        eh.setInactive();
+                    }
+                    return Event(wrappedHandle, INTERRUPTED);
+                }
                 
-                if (eh.isActive()) {
-                  if (pe.portev_events & POLLHUP) {
-                    if (eh.isHungup()) {
-                      return Event(handle, DISCONNECTED);
+                return Event(0, SHUTDOWN);            
+
+                if (pe.portev_source == PORT_SOURCE_FD) {
+                    QPID_LOG(trace, "About to send handle: " << handle);
+                    if (pe.portev_events & POLLHUP) {
+                        if (eh.isHungup()) {
+                            return Event(handle, DISCONNECTED);
+                        }
+                        eh.setHungup();
+                    } else {
+                        eh.setInactive();
                     }
-                    eh.setHungup();
-                  } else {
-                    eh.setInactive();
-                  }
-                  QPID_LOG(trace, "Sending event (thread: "
-                           << pthread_self() << ") for handle " << handle
-                           << ", direction= "
-                           << PollerPrivate::pollToDirection(pe.portev_events));
-                  return Event(handle, PollerPrivate::pollToDirection(pe.portev_events));
+                    QPID_LOG(trace, "Sending event (thread: "
+                             << pthread_self() << ") for handle " << handle
+                             << ", direction= "
+                             << PollerPrivate::pollToDirection(pe.portev_events));
+                    return Event(handle, PollerPrivate::pollToDirection(pe.portev_events));
+                }
+            } else if (eh.isDeleted()) {
+                //Remove the handle from the poller
+                int rc = ::port_dissociate(impl->portId, PORT_SOURCE_FD,
+                                           (uintptr_t) eh.fd);
+                if (rc == -1 && errno != EBADFD) {
+                    QPID_POSIX_CHECK(rc);
                 }
             }
         }
+
+        if (timeout == TIME_INFINITE) {
+            continue;
+        }
+        if (rc == 0 && now() > targetTimeout) {
+            PollerHandleDeletionManager.markAllUnusedInThisThread();
+            return Event(0, TIMEOUT);
+        }
     } while (true);
 }
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org