You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2009/07/09 22:38:23 UTC

svn commit: r792676 - in /qpid/trunk/qpid/cpp/src: qpid/sys/PollableCondition.h qpid/sys/PollableQueue.h qpid/sys/posix/PollableCondition.cpp qpid/sys/windows/PollableCondition.cpp tests/PollableCondition.cpp

Author: aconway
Date: Thu Jul  9 20:38:23 2009
New Revision: 792676

URL: http://svn.apache.org/viewvc?rev=792676&view=rev
Log:
Simplified PollableCondition

Modified:
    qpid/trunk/qpid/cpp/src/qpid/sys/PollableCondition.h
    qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h
    qpid/trunk/qpid/cpp/src/qpid/sys/posix/PollableCondition.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp
    qpid/trunk/qpid/cpp/src/tests/PollableCondition.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/PollableCondition.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/PollableCondition.h?rev=792676&r1=792675&r2=792676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/PollableCondition.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/PollableCondition.h Thu Jul  9 20:38:23 2009
@@ -44,28 +44,13 @@
 
     /**
      * Set the condition. Triggers callback to Callback from Poller.
-     * When callback is made, condition is suspended. Call rearm() to
-     * resume reacting to the condition.
      */
     QPID_COMMON_EXTERN void set();
 
     /**
-     * Get the current state of the condition, then clear it.
-     *
-     * @return The state of the condition before it was cleared.
+     * Clear the condition. Stops callbacks from Poller.
      */
-    QPID_COMMON_EXTERN bool clear();
-
-    /**
-     * Temporarily suspend the ability for the poller to react to the
-     * condition. It can be rearm()ed later.
-     */
-    QPID_COMMON_EXTERN void disarm();
-
-    /**
-     * Reset the ability for the poller to react to the condition.
-     */
-    QPID_COMMON_EXTERN void rearm();
+    QPID_COMMON_EXTERN void clear();
 
  private:
     PollableConditionPrivate *impl;

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h?rev=792676&r1=792675&r2=792676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h Thu Jul  9 20:38:23 2009
@@ -119,7 +119,6 @@
     if (!stopped) return;
     stopped = false;
     if (!queue.empty()) condition.set();
-    condition.rearm();
 }
 
 template <class T> PollableQueue<T>::~PollableQueue() {
@@ -139,7 +138,6 @@
     dispatcher = Thread();
     if (queue.empty()) cond.clear();
     if (stopped) lock.notifyAll();
-    else cond.rearm();
 }
 
 template <class T> void PollableQueue<T>::process() {
@@ -166,11 +164,11 @@
 template <class T> void PollableQueue<T>::stop() {
     ScopedLock l(lock);
     if (stopped) return;
-    condition.disarm();
+    condition.clear();
     stopped = true;
     // Avoid deadlock if stop is called from the dispatch thread
-    while (dispatcher.id() && dispatcher.id() != Thread::current().id())
-        lock.wait();
+    if (dispatcher.id() != Thread::current().id())
+        while (dispatcher.id()) lock.wait();
 }
 
 }} // namespace qpid::sys

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/posix/PollableCondition.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/posix/PollableCondition.cpp?rev=792676&r1=792675&r2=792676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/posix/PollableCondition.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/posix/PollableCondition.cpp Thu Jul  9 20:38:23 2009
@@ -46,8 +46,8 @@
     ~PollableConditionPrivate();
 
     void dispatch(sys::DispatchHandle& h);
-    void rewatch();
-    void unwatch();
+    void set();
+    void clear();
 
 private:
     PollableCondition::Callback cb;
@@ -57,10 +57,11 @@
     std::auto_ptr<DispatchHandleRef> handle;
 };
 
-PollableConditionPrivate::PollableConditionPrivate(const sys::PollableCondition::Callback& cb,
-                                                   sys::PollableCondition& parent,
-                                                   const boost::shared_ptr<sys::Poller>& poller)
-  : IOHandle(new sys::IOHandlePrivate), cb(cb), parent(parent)
+PollableConditionPrivate::PollableConditionPrivate(
+    const sys::PollableCondition::Callback& cb,
+    sys::PollableCondition& parent,
+    const boost::shared_ptr<sys::Poller>& poller
+) : IOHandle(new sys::IOHandlePrivate), cb(cb), parent(parent)
 {
     int fds[2];
     if (::pipe(fds) == -1)
@@ -71,39 +72,41 @@
         throw ErrnoException(QPID_MSG("Can't create PollableCondition"));
     if (::fcntl(writeFd, F_SETFL, O_NONBLOCK) == -1)
         throw ErrnoException(QPID_MSG("Can't create PollableCondition"));
-    handle.reset (new DispatchHandleRef(*this,
-                                        boost::bind(&sys::PollableConditionPrivate::dispatch, this, _1),
-                                        0, 0));
+    handle.reset (new DispatchHandleRef(
+                      *this,
+                      boost::bind(&sys::PollableConditionPrivate::dispatch, this, _1),
+                      0, 0));
     handle->startWatch(poller);
     handle->unwatch();
+
+    // Make the read FD readable
+    static const char dummy=0;
+    ssize_t n = ::write(writeFd, &dummy, 1);
+    if (n == -1 && errno != EAGAIN)
+        throw ErrnoException("Error setting PollableCondition");
 }
 
-PollableConditionPrivate::~PollableConditionPrivate()
-{
+PollableConditionPrivate::~PollableConditionPrivate() {
     handle->stopWatch();
     close(writeFd);
 }
 
-void PollableConditionPrivate::dispatch(sys::DispatchHandle& /*h*/)
-{
+void PollableConditionPrivate::dispatch(sys::DispatchHandle&) {
     cb(parent);
 }
 
-void PollableConditionPrivate::rewatch()
-{
+void PollableConditionPrivate::set() {
     handle->rewatch();
 }
 
-void PollableConditionPrivate::unwatch()
-{
+void PollableConditionPrivate::clear() {
     handle->unwatch();
 }
 
-  /* PollableCondition */
 
 PollableCondition::PollableCondition(const Callback& cb,
-                                     const boost::shared_ptr<sys::Poller>& poller)
-  : impl(new PollableConditionPrivate(cb, *this, poller))
+                                     const boost::shared_ptr<sys::Poller>& poller
+) : impl(new PollableConditionPrivate(cb, *this, poller))
 {
 }
 
@@ -112,75 +115,9 @@
     delete impl;
 }
 
-void PollableCondition::set() {
-    static const char dummy=0;
-    ssize_t n = ::write(impl->writeFd, &dummy, 1);
-    if (n == -1 && errno != EAGAIN)
-        throw ErrnoException("Error setting PollableCondition");
-}
-
-bool PollableCondition::clear() {
-    char buf[256];
-    ssize_t n;
-    bool wasSet = false;
-    while ((n = ::read(impl->impl->fd, buf, sizeof(buf))) > 0) 
-        wasSet = true;
-    if (n == -1 && errno != EAGAIN)
-        throw ErrnoException(QPID_MSG("Error clearing PollableCondition"));
-    return wasSet;
-}
-
-void PollableCondition::disarm() {
-    impl->unwatch();
-}
-
-void PollableCondition::rearm() {
-    impl->rewatch();
-}
-
-
-#if 0
-// FIXME aconway 2008-08-12: More efficient Linux implementation using
-// eventfd system call.  Move to separate file & do configure.ac test
-// to enable this when ::eventfd() is available.
-
-#include <sys/eventfd.h>
-
-namespace qpid {
-namespace sys {
-
-PollableConditionPrivate::PollableConditionPrivate(const PollableCondition::Callback& cb,
-                                                   sys::PollableCondition& parent,
-                                                   const boost::shared_ptr<sys::Poller>& poller)
-  : cb(cb), parent(parent), poller(poller),
-    IOHandle(new sys::IOHandlePrivate) {
-    impl->fd = ::eventfd(0, 0);
-    if (impl->fd < 0) throw ErrnoException("conditionfd() failed");
-}
-
-void PollableCondition::set() {
-    static const uint64_t value=1;
-    ssize_t n = ::write(impl->impl->fd,
-                        reinterpret_cast<const void*>(&value), 8);
-    if (n != 8) throw ErrnoException("write failed on conditionfd");
-}
-
-bool PollableCondition::clear() {
-    char buf[8];
-    ssize_t n = ::read(impl->impl->fd, buf, 8);
-    if (n != 8) throw ErrnoException("read failed on conditionfd");
-    return *reinterpret_cast<uint64_t*>(buf);
-}
-
-void PollableCondition::disarm() {
-  // ????
-}
+void PollableCondition::set() { impl->set(); }
 
-void PollableCondition::rearm() {
-  // ????
-}
-    
-#endif
+void PollableCondition::clear() { impl->clear(); }
 
 }} // namespace qpid::sys
 

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp?rev=792676&r1=792675&r2=792676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp Thu Jul  9 20:38:23 2009
@@ -107,17 +107,8 @@
     impl->poke();
 }
 
-bool PollableCondition::clear() {
-    return (0 != ::InterlockedExchange(&impl->isSet, 0));
-}
-
-void PollableCondition::disarm() {
-    ::InterlockedExchange(&impl->armed, 0);
-}
-
-void PollableCondition::rearm() {
-    if (0 == ::InterlockedExchange(&impl->armed, 1) && impl->isSet)
-        impl->poke();
+void PollableCondition::clear() {
+    ::InterlockedExchange(&impl->isSet, 0);
 }
 
 }} // namespace qpid::sys

Modified: qpid/trunk/qpid/cpp/src/tests/PollableCondition.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/PollableCondition.cpp?rev=792676&r1=792675&r2=792676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/PollableCondition.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/PollableCondition.cpp Thu Jul  9 20:38:23 2009
@@ -38,7 +38,7 @@
 
 class  Callback {
   public:    
-    enum Action { NONE, DISARM, CLEAR, DISARM_CLEAR };
+    enum Action { NONE, CLEAR };
 
     Callback() : count(), action(NONE) {}
 
@@ -47,9 +47,7 @@
         ++count;
         switch(action) {
           case NONE: break; 
-          case DISARM:  pc.disarm(); break;
           case CLEAR: pc.clear(); break;
-          case DISARM_CLEAR: pc.disarm(); pc.clear(); break;
         }
         action = NONE;
         lock.notify();
@@ -86,27 +84,19 @@
 
     Thread runner = Thread(*poller);
     
-    BOOST_CHECK(callback.isNotCalling()); // condition is not set or armed.
-
-    pc.rearm();                          
-    BOOST_CHECK(callback.isNotCalling()); // Armed but not set
+    BOOST_CHECK(callback.isNotCalling()); // condition is not set.
 
     pc.set();
-    BOOST_CHECK(callback.isCalling()); // Armed and set.
-    BOOST_CHECK(callback.isCalling()); // Still armed and set.
-
-    callback.nextCall(Callback::DISARM);
-    BOOST_CHECK(callback.isNotCalling()); // set but not armed
+    BOOST_CHECK(callback.isCalling()); // Set.
+    BOOST_CHECK(callback.isCalling()); // Still set.
 
-    pc.rearm();
-    BOOST_CHECK(callback.isCalling()); // Armed and set.
-    callback.nextCall(Callback::CLEAR);    
-    BOOST_CHECK(callback.isNotCalling()); // armed but not set
+    callback.nextCall(Callback::CLEAR);
+    BOOST_CHECK(callback.isNotCalling()); // Cleared
 
     pc.set();
-    BOOST_CHECK(callback.isCalling()); // Armed and set.
-    callback.nextCall(Callback::DISARM_CLEAR);    
-    BOOST_CHECK(callback.isNotCalling()); // not armed or set.
+    BOOST_CHECK(callback.isCalling()); // Set.
+    callback.nextCall(Callback::CLEAR);
+    BOOST_CHECK(callback.isNotCalling()); // Cleared.
 
     poller->shutdown();
     runner.join();



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org