You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2008/07/10 16:44:03 UTC

svn commit: r675598 - in /incubator/qpid/branches/qpid.0-10/cpp/src: ./ qpid/client/ qpid/sys/ tests/

Author: gsim
Date: Thu Jul 10 07:44:01 2008
New Revision: 675598

URL: http://svn.apache.org/viewvc?rev=675598&view=rev
Log:
Add a get() method to subscription manager that retrieves one message from the specified queue if available, returns false otherwise.


Added:
    incubator/qpid/branches/qpid.0-10/cpp/src/qpid/client/FlowControl.h
      - copied unchanged from r671655, incubator/qpid/trunk/qpid/cpp/src/qpid/client/FlowControl.h
Removed:
    incubator/qpid/branches/qpid.0-10/cpp/src/tests/ConcurrentQueue.cpp
    incubator/qpid/branches/qpid.0-10/cpp/src/tests/ais_run
Modified:
    incubator/qpid/branches/qpid.0-10/cpp/src/Makefile.am
    incubator/qpid/branches/qpid.0-10/cpp/src/qpid/client/LocalQueue.cpp
    incubator/qpid/branches/qpid.0-10/cpp/src/qpid/client/LocalQueue.h
    incubator/qpid/branches/qpid.0-10/cpp/src/qpid/client/SubscriptionManager.cpp
    incubator/qpid/branches/qpid.0-10/cpp/src/qpid/client/SubscriptionManager.h
    incubator/qpid/branches/qpid.0-10/cpp/src/qpid/sys/BlockingQueue.h
    incubator/qpid/branches/qpid.0-10/cpp/src/qpid/sys/Waitable.h
    incubator/qpid/branches/qpid.0-10/cpp/src/tests/ClientSessionTest.cpp

Modified: incubator/qpid/branches/qpid.0-10/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/cpp/src/Makefile.am?rev=675598&r1=675597&r2=675598&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/cpp/src/Makefile.am (original)
+++ incubator/qpid/branches/qpid.0-10/cpp/src/Makefile.am Thu Jul 10 07:44:01 2008
@@ -443,6 +443,7 @@
   qpid/client/Demux.h \
   qpid/client/Dispatcher.h \
   qpid/client/Execution.h \
+  qpid/client/FlowControl.h \
   qpid/client/Future.h \
   qpid/client/FutureCompletion.h \
   qpid/client/FutureResult.h \

Modified: incubator/qpid/branches/qpid.0-10/cpp/src/qpid/client/LocalQueue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/cpp/src/qpid/client/LocalQueue.cpp?rev=675598&r1=675597&r2=675598&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/cpp/src/qpid/client/LocalQueue.cpp (original)
+++ incubator/qpid/branches/qpid.0-10/cpp/src/qpid/client/LocalQueue.cpp Thu Jul 10 07:44:01 2008
@@ -31,14 +31,25 @@
 LocalQueue::LocalQueue(AckPolicy a) : autoAck(a) {}
 LocalQueue::~LocalQueue() {}
 
-Message LocalQueue::pop() {
+Message LocalQueue::pop() { return get(); }
+
+Message LocalQueue::get() {
+    Message result;
+    bool ok = get(result, sys::TIME_INFINITE);
+    assert(ok); (void) ok;
+    return result;
+}
+
+bool LocalQueue::get(Message& result, sys::Duration timeout) {
     if (!queue)
         throw ClosedException();
-    FrameSet::shared_ptr content = queue->pop();
+    FrameSet::shared_ptr content;
+    bool ok = queue->pop(content, timeout);
+    if (!ok) return false;
     if (content->isA<MessageTransferBody>()) {
-        Message m(*content);
-        autoAck.ack(m, session);
-        return m;
+        result = Message(*content);
+        autoAck.ack(result, session);
+        return true;
     }
     else
         throw CommandInvalidException(

Modified: incubator/qpid/branches/qpid.0-10/cpp/src/qpid/client/LocalQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/cpp/src/qpid/client/LocalQueue.h?rev=675598&r1=675597&r2=675598&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/cpp/src/qpid/client/LocalQueue.h (original)
+++ incubator/qpid/branches/qpid.0-10/cpp/src/qpid/client/LocalQueue.h Thu Jul 10 07:44:01 2008
@@ -25,6 +25,7 @@
 #include "qpid/client/Message.h"
 #include "qpid/client/Demux.h"
 #include "qpid/client/AckPolicy.h"
+#include "qpid/sys/Time.h"
 
 namespace qpid {
 namespace client {
@@ -42,7 +43,7 @@
   public:
     /** Create a local queue. Subscribe the local queue to a remote broker
      * queue with a SubscriptionManager.
-     * 
+     *
      * LocalQueue is an alternative to implementing a MessageListener.
      * 
      *@param ackPolicy Policy for acknowledging messages. @see AckPolicy.
@@ -51,14 +52,22 @@
 
     ~LocalQueue();
 
-    /** Pop the next message off the local queue.
+    /** Wait up to timeout for the next message from the local queue.
+     *@param result Set to the message from the queue.
+     *@param timeout wait up this timeout for a message to appear. 
+     *@return true if result was set, false if queue was empty after timeout.
+     */
+    bool get(Message& result, sys::Duration timeout=0);
+
+    /** Get the next message off the local queue, or wait for a
+     * message from the broker queue.
      *@exception ClosedException if subscription has been closed.
      */
-    Message pop();
+    Message get();
 
     /** Synonym for get(). */
-    Message get() { return pop(); }
-    
+    Message pop();
+
     /** Return true if local queue is empty. */
     bool empty() const;
 
@@ -72,10 +81,11 @@
     AckPolicy& getAckPolicy();
 
   private:
-    friend class SubscriptionManager;
     Session session;
     Demux::QueuePtr queue;
     AckPolicy autoAck;
+
+  friend class SubscriptionManager;
 };
 
 }} // namespace qpid::client

Modified: incubator/qpid/branches/qpid.0-10/cpp/src/qpid/client/SubscriptionManager.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/cpp/src/qpid/client/SubscriptionManager.cpp?rev=675598&r1=675597&r2=675598&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/cpp/src/qpid/client/SubscriptionManager.cpp (original)
+++ incubator/qpid/branches/qpid.0-10/cpp/src/qpid/client/SubscriptionManager.cpp Thu Jul 10 07:44:01 2008
@@ -25,6 +25,7 @@
 #include <qpid/client/Dispatcher.h>
 #include <qpid/client/Session.h>
 #include <qpid/client/MessageListener.h>
+#include <qpid/framing/Uuid.h>
 #include <set>
 #include <sstream>
 
@@ -34,35 +35,48 @@
 
 SubscriptionManager::SubscriptionManager(const Session& s)
     : dispatcher(s), session(s),
-      messages(UNLIMITED), bytes(UNLIMITED), window(true),
+      flowControl(UNLIMITED, UNLIMITED, false),
       acceptMode(0), acquireMode(0),
       autoStop(true)
 {}
 
 void SubscriptionManager::subscribeInternal(
-    const std::string& q, const std::string& dest)
+    const std::string& q, const std::string& dest, const FlowControl& fc)
 {
     session.messageSubscribe( 
         arg::queue=q, arg::destination=dest,
         arg::acceptMode=acceptMode, arg::acquireMode=acquireMode);
-    setFlowControl(dest, messages, bytes, window); 
+    if (fc.messages || fc.bytes) // No need to set if all 0.
+        setFlowControl(dest, fc);
 }
 
 void SubscriptionManager::subscribe(
     MessageListener& listener, const std::string& q, const std::string& d)
 {
+    subscribe(listener, q, getFlowControl(), d);
+}
+
+void SubscriptionManager::subscribe(
+    MessageListener& listener, const std::string& q, const FlowControl& fc, const std::string& d)
+{
     std::string dest=d.empty() ? q:d;
     dispatcher.listen(dest, &listener, autoAck);
-    return subscribeInternal(q, dest);
+    return subscribeInternal(q, dest, fc);
 }
 
 void SubscriptionManager::subscribe(
     LocalQueue& lq, const std::string& q, const std::string& d)
 {
+    subscribe(lq, q, getFlowControl(), d);
+}
+
+void SubscriptionManager::subscribe(
+    LocalQueue& lq, const std::string& q, const FlowControl& fc, const std::string& d)
+{
     std::string dest=d.empty() ? q:d;
     lq.session=session;
     lq.queue=session.getExecution().getDemux().add(dest, ByTransferDest(dest));
-    return subscribeInternal(q, dest);
+    return subscribeInternal(q, dest, fc);
 }
 
 void SubscriptionManager::setFlowControl(
@@ -74,14 +88,20 @@
     session.sync();
 }
 
+void SubscriptionManager::setFlowControl(const std::string& dest, const FlowControl& fc) {
+    setFlowControl(dest, fc.messages, fc.bytes, fc.window);
+}
+
+void SubscriptionManager::setFlowControl(const FlowControl& fc) { flowControl=fc; }
+
 void SubscriptionManager::setFlowControl(
     uint32_t messages_,  uint32_t bytes_, bool window_)
 {
-    messages=messages_;
-    bytes=bytes_;
-    window=window_;
+    setFlowControl(FlowControl(messages_, bytes_, window_));
 }
 
+const FlowControl& SubscriptionManager::getFlowControl() const { return flowControl; }
+
 void SubscriptionManager::setAcceptMode(bool c) { acceptMode=c; }
 
 void SubscriptionManager::setAcquireMode(bool a) { acquireMode=a; }
@@ -109,6 +129,15 @@
     dispatcher.stop();
 }
 
+bool SubscriptionManager::get(Message& result, const std::string& queue, sys::Duration timeout) {
+    LocalQueue lq;
+    std::string unique = framing::Uuid(true).str();
+    subscribe(lq, queue, FlowControl::messageCredit(1), unique);
+    AutoCancel ac(*this, unique);
+    sync(session).messageFlush(unique);
+    return lq.get(result, timeout);
+}
+
 }} // namespace qpid::client
 
 #endif

Modified: incubator/qpid/branches/qpid.0-10/cpp/src/qpid/client/SubscriptionManager.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/cpp/src/qpid/client/SubscriptionManager.h?rev=675598&r1=675597&r2=675598&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/cpp/src/qpid/client/SubscriptionManager.h (original)
+++ incubator/qpid/branches/qpid.0-10/cpp/src/qpid/client/SubscriptionManager.h Thu Jul 10 07:44:01 2008
@@ -27,8 +27,8 @@
 #include <qpid/client/Session.h>
 #include <qpid/client/MessageListener.h>
 #include <qpid/client/LocalQueue.h>
+#include <qpid/client/FlowControl.h>
 #include <qpid/sys/Runnable.h>
-
 #include <set>
 #include <sstream>
 
@@ -48,13 +48,11 @@
     typedef sys::Mutex::ScopedLock Lock;
     typedef sys::Mutex::ScopedUnlock Unlock;
 
-    void subscribeInternal(const std::string& q, const std::string& dest);
+    void subscribeInternal(const std::string& q, const std::string& dest, const FlowControl&);
     
     qpid::client::Dispatcher dispatcher;
     qpid::client::AsyncSession session;
-    uint32_t messages;
-    uint32_t bytes;
-    bool window;
+    FlowControl flowControl;
     AckPolicy autoAck;
     bool acceptMode;
     bool acquireMode;
@@ -72,6 +70,38 @@
      * 
      *@param listener Listener object to receive messages.
      *@param queue Name of the queue to subscribe to.
+     *@param flow initial FlowControl for the subscription.
+     *@param tag Unique destination tag for the listener.
+     * If not specified, the queue name is used.
+     */
+    void subscribe(MessageListener& listener,
+                   const std::string& queue,
+                   const FlowControl& flow,
+                   const std::string& tag=std::string());
+
+    /**
+     * Subscribe a LocalQueue to receive messages from queue.
+     * 
+     * Incoming messages are stored in the queue for you to retrieve.
+     * 
+     *@param queue Name of the queue to subscribe to.
+     *@param flow initial FlowControl for the subscription.
+     *@param tag Unique destination tag for the listener.
+     * If not specified, the queue name is used.
+     */
+    void subscribe(LocalQueue& localQueue,
+                   const std::string& queue,
+                   const FlowControl& flow,
+                   const std::string& tag=std::string());
+
+    /**
+     * Subscribe a MessagesListener to receive messages from queue.
+     *
+     * Provide your own subclass of MessagesListener to process
+     * incoming messages. It will be called for each message received.
+     * 
+     *@param listener Listener object to receive messages.
+     *@param queue Name of the queue to subscribe to.
      *@param tag Unique destination tag for the listener.
      * If not specified, the queue name is used.
      */
@@ -92,6 +122,15 @@
                    const std::string& queue,
                    const std::string& tag=std::string());
 
+
+    /** Get a single message from a queue.
+     *@param result is set to the message from the queue.
+     *@
+     *@param timeout wait up this timeout for a message to appear. 
+     *@return true if result was set, false if no message available after timeout.
+     */
+    bool get(Message& result, const std::string& queue, sys::Duration timeout=0);
+
     /** Cancel a subscription. */
     void cancel(const std::string tag);
 
@@ -107,9 +146,17 @@
     /** Cause run() to return */
     void stop();
 
-
     static const uint32_t UNLIMITED=0xFFFFFFFF;
 
+    /** Set the flow control for destination. */
+    void setFlowControl(const std::string& destintion, const FlowControl& flow);
+
+    /** Set the default initial flow control for subscriptions that do not specify it. */
+    void setFlowControl(const FlowControl& flow);
+
+    /** Get the default flow control for new subscriptions that do not specify it. */
+    const FlowControl& getFlowControl() const;
+
     /** Set the flow control for destination tag.
      *@param tag: name of the destination.
      *@param messages: message credit.
@@ -148,6 +195,15 @@
      AckPolicy& getAckPolicy();
 };
 
+/** AutoCancel cancels a subscription in its destructor */
+class AutoCancel {
+  public:
+    AutoCancel(SubscriptionManager& sm_, const std::string& tag_) : sm(sm_), tag(tag_) {}
+    ~AutoCancel() { sm.cancel(tag); }
+  private:
+    SubscriptionManager& sm;
+    std::string tag;
+};
 
 }} // namespace qpid::client
 

Modified: incubator/qpid/branches/qpid.0-10/cpp/src/qpid/sys/BlockingQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/cpp/src/qpid/sys/BlockingQueue.h?rev=675598&r1=675597&r2=675598&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/cpp/src/qpid/sys/BlockingQueue.h (original)
+++ incubator/qpid/branches/qpid.0-10/cpp/src/qpid/sys/BlockingQueue.h Thu Jul 10 07:44:01 2008
@@ -35,47 +35,45 @@
 template <class T>
 class BlockingQueue
 {
-    mutable sys::Waitable lock;
+    mutable sys::Waitable waitable;
     std::queue<T> queue;
-    bool closed;
 
 public:
-    BlockingQueue() : closed(false) {}
+    BlockingQueue() {}
     ~BlockingQueue() { close(); }
 
-    /** Block until there is a value to pop */
-    T pop()
-    {
-        Waitable::ScopedLock l(lock);
-        if (!queueWait()) throw ClosedException();
-        return popInternal();
-    }
-
-    /** Non-blocking pop. If there is a value set outValue and return
-     * true, else return false;
+    /** Pop from the queue, block up to timeout if empty.
+     *@param result Set to value popped from queue.
+     *@param timeout Defaults to infinite.
+     *@return true if result was set, false if queue empty after timeout.
      */
-    bool tryPop(T& outValue) {
-        Waitable::ScopedLock l(lock);
+    bool pop(T& result, Duration timeout=TIME_INFINITE) {
+        Mutex::ScopedLock l(waitable);
+        {
+            Waitable::ScopedWait w(waitable);
+            AbsTime deadline(now(),timeout);
+            while (queue.empty() && deadline > now()) waitable.wait(deadline);
+        }
         if (queue.empty()) return false;
-        outValue = popInternal();
+        result = queue.front();
+        queue.pop();
+        if (!queue.empty())
+            waitable.notify();  // Notify another waiter.
         return true;
     }
 
-    /** Non-blocking pop. If there is a value return it, else return
-     * valueIfEmpty.
-     */
-    T tryPop(const T& valueIfEmpty=T()) {
-        T result=valueIfEmpty;
-        tryPop(result);
+    T pop() {
+        T result;
+        bool ok = pop(result);
+        assert(ok); (void) ok;  // Infinite wait.
         return result;
     }
-
+        
     /** Push a value onto the queue */
-    void push(const T& t)
-    {
-        Waitable::ScopedLock l(lock);
+    void push(const T& t) {
+        Mutex::ScopedLock l(waitable);
         queue.push(t);
-        queueNotify(0);
+        waitable.notify();      // Notify a waiter.
     }
 
     /**
@@ -84,56 +82,33 @@
      */ 
     void close(const ExceptionHolder& ex=ExceptionHolder(new ClosedException()))
     {
-        Waitable::ScopedLock l(lock);
-        if (!closed) {
-            lock.setException(ex);
-            closed = true;
-            lock.notifyAll();
-            lock.waitWaiters(); // Ensure no threads are still waiting.
+        Mutex::ScopedLock l(waitable);
+        if (!waitable.hasException()) {
+            waitable.setException(ex);
+            waitable.notifyAll();
+            waitable.waitWaiters(); // Ensure no threads are still waiting.
         }
     }
 
     /** Open a closed queue. */
     void open() {
-        Waitable::ScopedLock l(lock);
-        closed=false;
+        Mutex::ScopedLock l(waitable);
+        waitable.resetException();
     }
 
     bool isClosed() const { 
-        Waitable::ScopedLock l(lock);
-        return closed;
+        Mutex::ScopedLock l(waitable);
+        return waitable.hasException();
     }
 
     bool empty() const {
-        Waitable::ScopedLock l(lock);
+        Mutex::ScopedLock l(waitable);
         return queue.empty();
     }    
     size_t size() const {
-        Waitable::ScopedLock l(lock);
+        Mutex::ScopedLock l(waitable);
         return queue.size();
     }    
-
-  private:
-
-    void queueNotify(size_t ignore) {
-        if (!queue.empty() && lock.hasWaiters()>ignore)
-            lock.notify();      // Notify another waiter.
-    }
-
-    bool queueWait() {
-        Waitable::ScopedWait w(lock);
-        while (!closed && queue.empty())
-            lock.wait();
-        return !queue.empty();
-    }
-
-    T popInternal() {
-        T t=queue.front();
-        queue.pop();
-        queueNotify(1);
-        return t;
-    }
-    
 };
 
 }}

Modified: incubator/qpid/branches/qpid.0-10/cpp/src/qpid/sys/Waitable.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/cpp/src/qpid/sys/Waitable.h?rev=675598&r1=675597&r2=675598&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/cpp/src/qpid/sys/Waitable.h (original)
+++ incubator/qpid/branches/qpid.0-10/cpp/src/qpid/sys/Waitable.h Thu Jul 10 07:44:01 2008
@@ -76,6 +76,12 @@
         
     }
 
+    /** True if the waitable has an exception */
+    bool hasException() const { return exception; }
+
+    /** Clear the exception if any */
+    void resetException() { exception.reset(); }
+
     /** Throws an exception if one is set before or during the wait. */
     void wait() {
         ExCheck e(exception);
@@ -88,8 +94,6 @@
         return Monitor::wait(absoluteTime);
     }
 
-    ExceptionHolder exception;
-
   private:
     struct ExCheck {
         const ExceptionHolder& exception;
@@ -98,6 +102,8 @@
     };
         
     size_t waiters;
+    ExceptionHolder exception;
+
   friend struct ScopedWait;
 };
 

Modified: incubator/qpid/branches/qpid.0-10/cpp/src/tests/ClientSessionTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/cpp/src/tests/ClientSessionTest.cpp?rev=675598&r1=675597&r2=675598&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/cpp/src/tests/ClientSessionTest.cpp (original)
+++ incubator/qpid/branches/qpid.0-10/cpp/src/tests/ClientSessionTest.cpp Thu Jul 10 07:44:01 2008
@@ -41,6 +41,7 @@
 using namespace qpid::framing;
 using namespace qpid;
 using qpid::sys::Monitor;
+using qpid::sys::TIME_SEC;
 using std::string;
 using std::cout;
 using std::endl;
@@ -203,7 +204,7 @@
     ClientSessionFixture fix;
     SimpleListener mylistener;
     fix.session.queueDeclare(queue="myq", exclusive=true, autoDelete=true);
-    fix.subs.subscribe(mylistener, "myq", "myq");
+    fix.subs.subscribe(mylistener, "myq");
     sys::Thread runner(fix.subs);//start dispatcher thread
     string data("msg");
     Message msg(data, "myq");
@@ -222,5 +223,34 @@
     }
 }
 
+QPID_AUTO_TEST_CASE(testLocalQueue) {
+    ClientSessionFixture fix;
+    fix.session.queueDeclare(queue="lq", exclusive=true, autoDelete=true);
+    LocalQueue lq;
+    fix.subs.subscribe(lq, "lq", FlowControl(2, FlowControl::UNLIMITED, false));
+    fix.session.messageTransfer(content=Message("foo0", "lq"));
+    fix.session.messageTransfer(content=Message("foo1", "lq"));
+    fix.session.messageTransfer(content=Message("foo2", "lq"));
+    BOOST_CHECK_EQUAL("foo0", lq.pop().getData());
+    BOOST_CHECK_EQUAL("foo1", lq.pop().getData());
+    BOOST_CHECK(lq.empty());    // Credit exhausted.
+    fix.subs.setFlowControl("lq", FlowControl::unlimited());
+    BOOST_CHECK_EQUAL("foo2", lq.pop().getData());    
+}
+
+QPID_AUTO_TEST_CASE(testGet) {
+    ClientSessionFixture fix;
+    fix.session.queueDeclare(queue="getq", exclusive=true, autoDelete=true);
+    fix.session.messageTransfer(content=Message("foo0", "getq"));
+    fix.session.messageTransfer(content=Message("foo1", "getq"));
+    Message got;
+    BOOST_CHECK(fix.subs.get(got, "getq", TIME_SEC));
+    BOOST_CHECK_EQUAL("foo0", got.getData());
+    BOOST_CHECK(fix.subs.get(got, "getq", TIME_SEC));
+    BOOST_CHECK_EQUAL("foo1", got.getData());
+    BOOST_CHECK(!fix.subs.get(got, "getq"));
+}
+
 QPID_AUTO_TEST_SUITE_END()
 
+