You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/07/01 20:01:11 UTC

svn commit: r673158 - in /incubator/qpid/trunk/qpid/cpp/src: qpid/client/ qpid/sys/ tests/

Author: aconway
Date: Tue Jul  1 11:01:11 2008
New Revision: 673158

URL: http://svn.apache.org/viewvc?rev=673158&view=rev
Log:
Added timeout to SubscriptionManager::get(), LocalQueue::get() and BlockingQueue::get()

Removed:
    incubator/qpid/trunk/qpid/cpp/src/tests/ConcurrentQueue.cpp
Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/BlockingQueue.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Waitable.h
    incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.cpp?rev=673158&r1=673157&r2=673158&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.cpp Tue Jul  1 11:01:11 2008
@@ -31,14 +31,25 @@
 LocalQueue::LocalQueue(AckPolicy a) : autoAck(a) {}
 LocalQueue::~LocalQueue() {}
 
-Message LocalQueue::pop() {
+Message LocalQueue::pop() { return get(); }
+
+Message LocalQueue::get() {
+    Message result;
+    bool ok = get(result, sys::TIME_INFINITE);
+    assert(ok); (void) ok;
+    return result;
+}
+
+bool LocalQueue::get(Message& result, sys::Duration timeout) {
     if (!queue)
         throw ClosedException();
-    FrameSet::shared_ptr content = queue->pop();
+    FrameSet::shared_ptr content;
+    bool ok = queue->pop(content, timeout);
+    if (!ok) return false;
     if (content->isA<MessageTransferBody>()) {
-        Message m(*content);
-        autoAck.ack(m, session);
-        return m;
+        result = Message(*content);
+        autoAck.ack(result, session);
+        return true;
     }
     else
         throw CommandInvalidException(

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h?rev=673158&r1=673157&r2=673158&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h Tue Jul  1 11:01:11 2008
@@ -25,6 +25,7 @@
 #include "qpid/client/Message.h"
 #include "qpid/client/Demux.h"
 #include "qpid/client/AckPolicy.h"
+#include "qpid/sys/Time.h"
 
 namespace qpid {
 namespace client {
@@ -42,7 +43,7 @@
   public:
     /** Create a local queue. Subscribe the local queue to a remote broker
      * queue with a SubscriptionManager.
-     * 
+     *
      * LocalQueue is an alternative to implementing a MessageListener.
      * 
      *@param ackPolicy Policy for acknowledging messages. @see AckPolicy.
@@ -51,14 +52,22 @@
 
     ~LocalQueue();
 
-    /** Pop the next message off the local queue.
+    /** Wait up to timeout for the next message from the local queue.
+     *@param result Set to the message from the queue.
+     *@param timeout wait up this timeout for a message to appear. 
+     *@return true if result was set, false if queue was empty after timeout.
+     */
+    bool get(Message& result, sys::Duration timeout=0);
+
+    /** Get the next message off the local queue, or wait for a
+     * message from the broker queue.
      *@exception ClosedException if subscription has been closed.
      */
+    Message get();
+
+    /** Synonym for get(). */
     Message pop();
 
-    /** Synonym for pop(). */
-    Message get() { return pop(); }
-    
     /** Return true if local queue is empty. */
     bool empty() const;
 
@@ -72,10 +81,11 @@
     AckPolicy& getAckPolicy();
 
   private:
-    friend class SubscriptionManager;
     Session session;
     Demux::QueuePtr queue;
     AckPolicy autoAck;
+
+  friend class SubscriptionManager;
 };
 
 }} // namespace qpid::client

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp?rev=673158&r1=673157&r2=673158&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp Tue Jul  1 11:01:11 2008
@@ -129,10 +129,13 @@
     dispatcher.stop();
 }
 
-Message SubscriptionManager::get(const std::string& queue) {
+bool SubscriptionManager::get(Message& result, const std::string& queue, sys::Duration timeout) {
     LocalQueue lq;
-    subscribe(lq, queue, FlowControl::messageCredit(1), framing::Uuid(true).str());
-    return lq.get();
+    std::string unique = framing::Uuid(true).str();
+    subscribe(lq, queue, FlowControl::messageCredit(1), unique);
+    AutoCancel ac(*this, unique);
+    sync(session).messageFlush(unique);
+    return lq.get(result, timeout);
 }
 
 }} // namespace qpid::client

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h?rev=673158&r1=673157&r2=673158&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h Tue Jul  1 11:01:11 2008
@@ -122,10 +122,14 @@
                    const std::string& queue,
                    const std::string& tag=std::string());
 
-    /**
-     * Get a single message from a queue.
+
+    /** Get a single message from a queue.
+     *@param result is set to the message from the queue.
+     *@
+     *@param timeout wait up this timeout for a message to appear. 
+     *@return true if result was set, false if no message available after timeout.
      */
-    Message get(const std::string& queue);
+    bool get(Message& result, const std::string& queue, sys::Duration timeout=0);
 
     /** Cancel a subscription. */
     void cancel(const std::string tag);
@@ -191,6 +195,15 @@
      AckPolicy& getAckPolicy();
 };
 
+/** AutoCancel cancels a subscription in its destructor */
+class AutoCancel {
+  public:
+    AutoCancel(SubscriptionManager& sm_, const std::string& tag_) : sm(sm_), tag(tag_) {}
+    ~AutoCancel() { sm.cancel(tag); }
+  private:
+    SubscriptionManager& sm;
+    std::string tag;
+};
 
 }} // namespace qpid::client
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/BlockingQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/BlockingQueue.h?rev=673158&r1=673157&r2=673158&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/BlockingQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/BlockingQueue.h Tue Jul  1 11:01:11 2008
@@ -35,47 +35,45 @@
 template <class T>
 class BlockingQueue
 {
-    mutable sys::Waitable lock;
+    mutable sys::Waitable waitable;
     std::queue<T> queue;
-    bool closed;
 
 public:
-    BlockingQueue() : closed(false) {}
+    BlockingQueue() {}
     ~BlockingQueue() { close(); }
 
-    /** Block until there is a value to pop */
-    T pop()
-    {
-        Waitable::ScopedLock l(lock);
-        if (!queueWait()) throw ClosedException();
-        return popInternal();
-    }
-
-    /** Non-blocking pop. If there is a value set outValue and return
-     * true, else return false;
+    /** Pop from the queue, block up to timeout if empty.
+     *@param result Set to value popped from queue.
+     *@param timeout Defaults to infinite.
+     *@return true if result was set, false if queue empty after timeout.
      */
-    bool tryPop(T& outValue) {
-        Waitable::ScopedLock l(lock);
+    bool pop(T& result, Duration timeout=TIME_INFINITE) {
+        Mutex::ScopedLock l(waitable);
+        {
+            Waitable::ScopedWait w(waitable);
+            AbsTime deadline(now(),timeout);
+            while (queue.empty()) waitable.wait(deadline);
+        }
         if (queue.empty()) return false;
-        outValue = popInternal();
+        result = queue.front();
+        queue.pop();
+        if (!queue.empty())
+            waitable.notify();  // Notify another waiter.
         return true;
     }
 
-    /** Non-blocking pop. If there is a value return it, else return
-     * valueIfEmpty.
-     */
-    T tryPop(const T& valueIfEmpty=T()) {
-        T result=valueIfEmpty;
-        tryPop(result);
+    T pop() {
+        T result;
+        bool ok = pop(result);
+        assert(ok); (void) ok;  // Infinite wait.
         return result;
     }
-
+        
     /** Push a value onto the queue */
-    void push(const T& t)
-    {
-        Waitable::ScopedLock l(lock);
+    void push(const T& t) {
+        Mutex::ScopedLock l(waitable);
         queue.push(t);
-        queueNotify(0);
+        waitable.notify();      // Notify a waiter.
     }
 
     /**
@@ -84,56 +82,33 @@
      */ 
     void close(const ExceptionHolder& ex=ExceptionHolder(new ClosedException()))
     {
-        Waitable::ScopedLock l(lock);
-        if (!closed) {
-            lock.setException(ex);
-            closed = true;
-            lock.notifyAll();
-            lock.waitWaiters(); // Ensure no threads are still waiting.
+        Mutex::ScopedLock l(waitable);
+        if (!waitable.hasException()) {
+            waitable.setException(ex);
+            waitable.notifyAll();
+            waitable.waitWaiters(); // Ensure no threads are still waiting.
         }
     }
 
     /** Open a closed queue. */
     void open() {
-        Waitable::ScopedLock l(lock);
-        closed=false;
+        Mutex::ScopedLock l(waitable);
+        waitable.resetException();
     }
 
     bool isClosed() const { 
-        Waitable::ScopedLock l(lock);
-        return closed;
+        Mutex::ScopedLock l(waitable);
+        return waitable.hasException();
     }
 
     bool empty() const {
-        Waitable::ScopedLock l(lock);
+        Mutex::ScopedLock l(waitable);
         return queue.empty();
     }    
     size_t size() const {
-        Waitable::ScopedLock l(lock);
+        Mutex::ScopedLock l(waitable);
         return queue.size();
     }    
-
-  private:
-
-    void queueNotify(size_t ignore) {
-        if (!queue.empty() && lock.hasWaiters()>ignore)
-            lock.notify();      // Notify another waiter.
-    }
-
-    bool queueWait() {
-        Waitable::ScopedWait w(lock);
-        while (!closed && queue.empty())
-            lock.wait();
-        return !queue.empty();
-    }
-
-    T popInternal() {
-        T t=queue.front();
-        queue.pop();
-        queueNotify(1);
-        return t;
-    }
-    
 };
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Waitable.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Waitable.h?rev=673158&r1=673157&r2=673158&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Waitable.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Waitable.h Tue Jul  1 11:01:11 2008
@@ -76,6 +76,12 @@
         
     }
 
+    /** True if the waitable has an exception */
+    bool hasException() const { return exception; }
+
+    /** Clear the exception if any */
+    void resetException() { exception.reset(); }
+
     /** Throws an exception if one is set before or during the wait. */
     void wait() {
         ExCheck e(exception);
@@ -88,8 +94,6 @@
         return Monitor::wait(absoluteTime);
     }
 
-    ExceptionHolder exception;
-
   private:
     struct ExCheck {
         const ExceptionHolder& exception;
@@ -98,6 +102,8 @@
     };
         
     size_t waiters;
+    ExceptionHolder exception;
+
   friend struct ScopedWait;
 };
 

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp?rev=673158&r1=673157&r2=673158&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp Tue Jul  1 11:01:11 2008
@@ -41,6 +41,7 @@
 using namespace qpid::framing;
 using namespace qpid;
 using qpid::sys::Monitor;
+using qpid::sys::TIME_SEC;
 using std::string;
 using std::cout;
 using std::endl;
@@ -242,8 +243,11 @@
     fix.session.queueDeclare(queue="getq", exclusive=true, autoDelete=true);
     fix.session.messageTransfer(content=Message("foo0", "getq"));
     fix.session.messageTransfer(content=Message("foo1", "getq"));
-    BOOST_CHECK_EQUAL("foo0", fix.subs.get("getq").getData());
-    BOOST_CHECK_EQUAL("foo1", fix.subs.get("getq").getData());
+    Message got;
+    BOOST_CHECK(fix.subs.get(got, "getq", TIME_SEC));
+    BOOST_CHECK_EQUAL("foo0", got.getData());
+    BOOST_CHECK(fix.subs.get(got, "getq", TIME_SEC));
+    BOOST_CHECK_EQUAL("foo1", got.getData());
 }
 
 QPID_AUTO_TEST_CASE(testOpenFailure) {

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=673158&r1=673157&r2=673158&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Tue Jul  1 11:01:11 2008
@@ -164,22 +164,18 @@
     push_back(new BrokerFixture(opts));
 }
 
-#if 0                           // FIXME aconway 2008-06-26: TODO
-
-
+#if 0
 QPID_AUTO_TEST_CASE(testWiringReplication) {
-    const size_t SIZE=3;
-    ClusterFixture cluster(SIZE);
+    ClusterFixture cluster(3);
     Client c0(cluster[0].getPort());
     BOOST_CHECK(c0.session.queueQuery("q").getQueue().empty());
     BOOST_CHECK(c0.session.exchangeQuery("ex").getType().empty()); 
     c0.session.queueDeclare("q");
     c0.session.exchangeDeclare("ex", arg::type="direct");
-    BOOST_CHECK_EQUAL("q", c0.session.queueQuery("q").getQueue());
-    BOOST_CHECK_EQUAL("direct", c0.session.exchangeQuery("ex").getType());
-
+    c0.session.close();
     // Verify all brokers get wiring update.
-    for (size_t i = 1; i < cluster.size(); ++i) {
+    for (size_t i = 0; i < cluster.size(); ++i) {
+        BOOST_MESSAGE("i == "<< i);
         Client c(cluster[i].getPort());
         BOOST_CHECK_EQUAL("q", c.session.queueQuery("q").getQueue());
         BOOST_CHECK_EQUAL("direct", c.session.exchangeQuery("ex").getType());
@@ -188,24 +184,15 @@
 
 QPID_AUTO_TEST_CASE(testMessageReplication) {
     // Enqueue on one broker, dequeue on another.
-    ClusterConnections cluster;
-    BOOST_REQUIRE(cluster.size() > 1);
-
-    Session broker0 = cluster[0]->newSession();
-    broker0.queueDeclare(queue="q");
-    broker0.messageTransfer(content=TransferContent("data", "q"));
-    broker0.close();
-    
-    Session broker1 = cluster[1]->newSession();
-    broker1.
-        c.session.messageSubscribe(queue="q", destination="q");
-        c.session.messageFlow(destination="q", unit=0, value=1);//messages
-        FrameSet::shared_ptr msg = c.session.get();
-        BOOST_CHECK(msg->isA<MessageTransferBody>());
-        BOOST_CHECK_EQUAL(string("data"), msg->getContent());
-        c.session.getExecution().completed(msg->getId(), true, true);
-        cluster[i]->close();
-    }    
+    ClusterFixture cluster(2);
+    Client c0(cluster[0].getPort());
+    c0.session.queueDeclare("q");
+    c0.session.messageTransfer(arg::content=TransferContent("data", "q"));
+    c0.session.close();
+    Client c1(cluster[1].getPort());
+    Message msg;
+    BOOST_CHECK(c1.subs.get(msg, "q", qpid::sys::TIME_SEC));
+    BOOST_CHECK_EQUAL(string("data"), msg.getData());
 }
 
 // TODO aconway 2008-06-25: dequeue replication, exactly once delivery, failover.