You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/11/07 17:06:32 UTC

svn commit: r592803 - in /incubator/qpid/trunk/qpid/cpp/src: qpid/ qpid/client/ qpid/sys/ tests/

Author: aconway
Date: Wed Nov  7 08:06:31 2007
New Revision: 592803

URL: http://svn.apache.org/viewvc?rev=592803&view=rev
Log:

Added LocalQueue subscriptions. LocalQueue::pop() provides a "pull"
 alternative to the MessageListener::received() "push" API.

Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h   (with props)
Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/BlockingQueue.h
    incubator/qpid/trunk/qpid/cpp/src/tests/InProcessBroker.h

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.cpp?rev=592803&r1=592802&r2=592803&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.cpp Wed Nov  7 08:06:31 2007
@@ -46,4 +46,6 @@
 
 const char* Exception::what() const throw() { return str().c_str(); }
 
+const std::string ClosedException::CLOSED_MESSAGE("Closed");
+
 } // namespace qpid

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.h?rev=592803&r1=592802&r2=592803&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.h Wed Nov  7 08:06:31 2007
@@ -61,6 +61,11 @@
         : Exception(message), code(code_) {}
 };
 
-} // namespace qpid
+struct ClosedException : public Exception {
+    static const std::string CLOSED_MESSAGE;
+    ClosedException(const std::string& msg=CLOSED_MESSAGE) : Exception(msg) {}
+};
 
+} // namespace qpid
+ 
 #endif  /*!_Exception_*/

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.cpp?rev=592803&r1=592802&r2=592803&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.cpp Wed Nov  7 08:06:31 2007
@@ -179,7 +179,7 @@
 bool Channel::get(Message& msg, const Queue& _queue, AckMode ackMode) {
     string tag = "get-handler";
     ScopedDivert handler(tag, session.execution().getDemux());
-    Demux::Queue& incoming = handler.getQueue();
+    Demux::QueuePtr incoming = handler.getQueue();
 
     session.messageSubscribe(destination=tag, queue=_queue.getName(), confirmMode=(ackMode == NO_ACK ? 0 : 1));
     session.messageFlow(tag, 1/*BYTES*/, 0xFFFFFFFF);
@@ -189,7 +189,7 @@
     session.messageCancel(tag);
 
     FrameSet::shared_ptr p;
-    if (incoming.tryPop(p)) {
+    if (incoming->tryPop(p)) {
         msg.populate(*p);
         if (ackMode == AUTO_ACK) msg.acknowledge(session, false, true);
         return true;
@@ -265,7 +265,7 @@
                 QPID_LOG(warning, "Dropping unsupported message type: " << content->getMethod());                        
             }
         }
-    } catch (const sys::QueueClosed&) {}
+    } catch (const ClosedException&) {}
 }
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.cpp?rev=592803&r1=592802&r2=592803&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.cpp Wed Nov  7 08:06:31 2007
@@ -37,7 +37,7 @@
 
 ScopedDivert::ScopedDivert(const std::string& _dest, Demux& _demuxer) : dest(_dest), demuxer(_demuxer) 
 {
-    queue = &(demuxer.add(dest, ByTransferDest(dest)));
+    queue = demuxer.add(dest, ByTransferDest(dest));
 }
 
 ScopedDivert::~ScopedDivert() 
@@ -45,9 +45,9 @@
     demuxer.remove(dest); 
 }
 
-Demux::Queue& ScopedDivert::getQueue()
+Demux::QueuePtr ScopedDivert::getQueue()
 {
-    return *queue;
+    return queue;
 }
 
 void Demux::handle(framing::FrameSet::shared_ptr frameset)
@@ -61,7 +61,7 @@
         }
     }
     if (!matched) {
-        defaultQueue.push(frameset);
+        defaultQueue->push(frameset);
     }
 }
 
@@ -71,17 +71,17 @@
     for (iterator i = records.begin(); i != records.end(); i++) {
         i->queue->close();
     }
-    defaultQueue.close();
+    defaultQueue->close();
 }
 
-Demux::Queue& Demux::add(const std::string& name, Condition condition)
+Demux::QueuePtr Demux::add(const std::string& name, Condition condition)
 {
     sys::Mutex::ScopedLock l(lock);
     iterator i = std::find_if(records.begin(), records.end(), Find(name));
     if (i == records.end()) {        
         Record r(name, condition);
         records.push_back(r);
-        return *(r.queue);
+        return r.queue;
     } else {
         throw Exception("Queue already exists for " + name);
     }
@@ -93,17 +93,17 @@
     records.remove_if(Find(name));
 }
 
-Demux::Queue& Demux::get(const std::string& name)
+Demux::QueuePtr Demux::get(const std::string& name)
 {
     sys::Mutex::ScopedLock l(lock);
     iterator i = std::find_if(records.begin(), records.end(), Find(name));
     if (i == records.end()) {
         throw Exception("No queue for " + name);
     }
-    return *(i->queue);
+    return i->queue;
 }
 
-Demux::Queue& Demux::getDefault()
+Demux::QueuePtr Demux::getDefault()
 {
     return defaultQueue;
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.h?rev=592803&r1=592802&r2=592803&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Demux.h Wed Nov  7 08:06:31 2007
@@ -45,16 +45,19 @@
 public:
     typedef boost::function<bool(const framing::FrameSet&)> Condition;
     typedef sys::BlockingQueue<framing::FrameSet::shared_ptr> Queue;
+    typedef boost::shared_ptr<Queue> QueuePtr;
 
+    Demux() : defaultQueue(new Queue()) {}
+    
     void handle(framing::FrameSet::shared_ptr);
     void close();
 
-    Queue& add(const std::string& name, Condition);
+    QueuePtr add(const std::string& name, Condition);
     void remove(const std::string& name);
-    Queue& get(const std::string& name);
-    Queue& getDefault();
+    QueuePtr get(const std::string& name);
+    QueuePtr getDefault();
+
 private:
-    typedef boost::shared_ptr<Queue> QueuePtr;
     struct Record
     {
         const std::string name;
@@ -66,7 +69,7 @@
 
     sys::Mutex lock;
     std::list<Record> records;
-    Queue defaultQueue;
+    QueuePtr defaultQueue;
 
     typedef std::list<Record>::iterator iterator;
 
@@ -82,15 +85,14 @@
 {
     const std::string dest;
     Demux& demuxer;
-    Demux::Queue* queue;
+    Demux::QueuePtr queue;
 public:
     ScopedDivert(const std::string& dest, Demux& demuxer);
     ~ScopedDivert();
-    Demux::Queue& getQueue();
+    Demux::QueuePtr getQueue();
 };
 
-}
-}
+}} // namespace qpid::client
 
 
 #endif

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp?rev=592803&r1=592802&r2=592803&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp Wed Nov  7 08:06:31 2007
@@ -62,14 +62,14 @@
 
 void Dispatcher::run()
 {    
-    sys::BlockingQueue<FrameSet::shared_ptr>& q = queue.empty() ? 
+    Demux::QueuePtr q = queue.empty() ? 
         session.execution().getDemux().getDefault() : 
         session.execution().getDemux().get(queue); 
 
     startRunning();
     stopped = false;
     while (!isStopped()) {
-        FrameSet::shared_ptr content = q.pop();
+        FrameSet::shared_ptr content = q->pop();
         if (content->isA<MessageTransferBody>()) {
             Message msg(*content);
             Subscriber::shared_ptr listener = find(msg.getDestination());

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h?rev=592803&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h Wed Nov  7 08:06:31 2007
@@ -0,0 +1,52 @@
+#ifndef QPID_CLIENT_LOCALQUEUE_H
+#define QPID_CLIENT_LOCALQUEUE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/client/Message.h"
+#include "qpid/Exception.h"
+#include "qpid/sys/BlockingQueue.h"
+
+namespace qpid {
+namespace client {
+
+/**
+ * Local representation of a remote queue.
+ */
+class LocalQueue
+{
+  public:
+    LocalQueue(BlockingQueue& q) : queue(q) {}
+    ~LocalQueue();
+
+    /** Pop the next message off the queue.
+     *@exception ClosedException if subscription has been closed.
+     */
+    Message pop() { reurn queue->pop(); }
+
+  private:
+    BlockingQueue& queue;
+};
+
+}} // namespace qpid::client
+
+#endif  /*!QPID_CLIENT_LOCALQUEUE_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp?rev=592803&r1=592802&r2=592803&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp Wed Nov  7 08:06:31 2007
@@ -162,7 +162,7 @@
 
 FrameSet::shared_ptr SessionCore::get() { // user thread
     // No lock here: pop does a blocking wait.
-    return l3.getDemux().getDefault().pop();
+    return l3.getDemux().getDefault()->pop();
 }
 
 void SessionCore::open(uint32_t detachedLifetime) { // user thread

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/BlockingQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/BlockingQueue.h?rev=592803&r1=592802&r2=592803&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/BlockingQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/BlockingQueue.h Wed Nov  7 08:06:31 2007
@@ -29,8 +29,6 @@
 namespace qpid {
 namespace sys {
 
-struct QueueClosed {};
-
 template <class T>
 class BlockingQueue
 {
@@ -46,7 +44,7 @@
     T pop()
     {
         Waitable::ScopedLock l(lock);
-        if (!queueWait()) throw QueueClosed();
+        if (!queueWait()) throw ClosedException();
         return popInternal();
     }
 
@@ -78,7 +76,7 @@
     }
 
     /**
-     * Close the queue. Throws QueueClosed in threads waiting in pop().
+     * Close the queue. Throws ClosedException in threads waiting in pop().
      * Blocks till all waiting threads have been notified.
      */ 
     void close()

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/InProcessBroker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/InProcessBroker.h?rev=592803&r1=592802&r2=592803&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/InProcessBroker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/InProcessBroker.h Wed Nov  7 08:06:31 2007
@@ -83,7 +83,7 @@
                         QPID_LOG(debug, QPID_MSG(receiver << " DROP: " << f));
                 }
             }
-            catch (const sys::QueueClosed&) {
+            catch (const ClosedException&) {
                 return;
             }
         }