You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/04/03 23:18:19 UTC

svn commit: r525282 - in /incubator/qpid/trunk/qpid/cpp/src/client: BasicMessageChannel.cpp BasicMessageChannel.h IncomingMessage.cpp IncomingMessage.h

Author: aconway
Date: Tue Apr  3 14:18:17 2007
New Revision: 525282

URL: http://svn.apache.org/viewvc?view=rev&rev=525282
Log:

Moved BasicMessage::WaitableDestination to IncomingMessage::WaitableDestination  so it can be shared by Basic and Message implementations.

Modified:
    incubator/qpid/trunk/qpid/cpp/src/client/BasicMessageChannel.cpp
    incubator/qpid/trunk/qpid/cpp/src/client/BasicMessageChannel.h
    incubator/qpid/trunk/qpid/cpp/src/client/IncomingMessage.cpp
    incubator/qpid/trunk/qpid/cpp/src/client/IncomingMessage.h

Modified: incubator/qpid/trunk/qpid/cpp/src/client/BasicMessageChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/client/BasicMessageChannel.cpp?view=diff&rev=525282&r1=525281&r2=525282
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/client/BasicMessageChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/client/BasicMessageChannel.cpp Tue Apr  3 14:18:17 2007
@@ -45,60 +45,10 @@
 const std::string BASIC_REF("__basic_reference__");
 }
     
-class BasicMessageChannel::WaitableDestination :
-        public IncomingMessage::Destination
-{
-  public:
-    WaitableDestination() : shutdownFlag(false) {}
-    void message(const Message& msg) {
-        Mutex::ScopedLock l(monitor);
-        queue.push(msg);
-        monitor.notify();   
-    }
-        
-    void empty() {
-        Mutex::ScopedLock l(monitor);
-        queue.push(Empty());
-        monitor.notify();
-    }
-    
-    bool wait(Message& msgOut) {
-        Mutex::ScopedLock l(monitor);
-        while (queue.empty() && !shutdownFlag)
-            monitor.wait();
-        if (shutdownFlag)
-            return false;
-        Message* msg = boost::get<Message>(&queue.front());
-        bool success = msg;
-        if (success) 
-            msgOut=*msg;
-        queue.pop();
-        if (!queue.empty())
-            monitor.notify();   // Wake another waiter.
-        return success;
-    }
-
-    void shutdown() {
-        Mutex::ScopedLock l(monitor);
-        shutdownFlag = true;
-        monitor.notifyAll();
-    }
-
-  private:
-    struct Empty {};
-    typedef boost::variant<Message,Empty> Item;
-    sys::Monitor monitor;
-    std::queue<Item> queue;
-    bool shutdownFlag;
-};
- 
-
 BasicMessageChannel::BasicMessageChannel(Channel& ch)
-    : channel(ch), returnsHandler(0),
-      destGet(new WaitableDestination()),
-      destDispatch(new WaitableDestination())
+    : channel(ch), returnsHandler(0)
 {
-    incoming.addDestination(BASIC_RETURN, *destDispatch);
+    incoming.addDestination(BASIC_RETURN, destDispatch);
 }
 
 void BasicMessageChannel::consume(
@@ -162,8 +112,8 @@
         consumersCopy = consumers;
         consumers.clear();
     }
-    destGet->shutdown();
-    destDispatch->shutdown();
+    destGet.shutdown();
+    destDispatch.shutdown();
     for (ConsumerMap::iterator i=consumersCopy.begin();
          i  != consumersCopy.end(); ++i)
     {
@@ -181,10 +131,10 @@
     Message& msg, const Queue& queue, AckMode ackMode)
 {
     // Prepare for incoming response
-    incoming.addDestination(BASIC_GET, *destGet);
+    incoming.addDestination(BASIC_GET, destGet);
     channel.send(
         new BasicGetBody(channel.version, 0, queue.getName(), ackMode));
-    bool got = destGet->wait(msg);
+    bool got = destGet.wait(msg);
     return got;
 }
 
@@ -273,7 +223,7 @@
           }
           // FIXME aconway 2007-03-23: Integrate consumer & destination
           // maps.
-          incoming.addDestination(tag, *destDispatch);
+          incoming.addDestination(tag, destDispatch);
           return;
       }
     }
@@ -342,7 +292,7 @@
     while(channel.isOpen()) {
         try {
             Message msg;
-            bool gotMessge = destDispatch->wait(msg);
+            bool gotMessge = destDispatch.wait(msg);
             if (gotMessge) {
                 if(msg.getDestination() == BASIC_RETURN) {
                     ReturnedMessageHandler* handler=0;

Modified: incubator/qpid/trunk/qpid/cpp/src/client/BasicMessageChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/client/BasicMessageChannel.h?view=diff&rev=525282&r1=525281&r2=525282
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/client/BasicMessageChannel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/client/BasicMessageChannel.h Tue Apr  3 14:18:17 2007
@@ -63,7 +63,6 @@
 
   private:
 
-    class WaitableDestination;
     struct Consumer{
         MessageListener* listener;
         AckMode ackMode;
@@ -80,8 +79,8 @@
     uint64_t incoming_size;
     ConsumerMap consumers ;
     ReturnedMessageHandler* returnsHandler;
-    boost::scoped_ptr<WaitableDestination> destGet;
-    boost::scoped_ptr<WaitableDestination> destDispatch;
+    IncomingMessage::WaitableDestination destGet;
+    IncomingMessage::WaitableDestination destDispatch;
 };
 
 }} // namespace qpid::client

Modified: incubator/qpid/trunk/qpid/cpp/src/client/IncomingMessage.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/client/IncomingMessage.cpp?view=diff&rev=525282&r1=525281&r2=525282
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/client/IncomingMessage.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/client/IncomingMessage.cpp Tue Apr  3 14:18:17 2007
@@ -32,6 +32,44 @@
 
 IncomingMessage::Destination::~Destination() {}
 
+    
+IncomingMessage::WaitableDestination::WaitableDestination()
+    : shutdownFlag(false) {}
+
+void IncomingMessage::WaitableDestination::message(const Message& msg) {
+    Mutex::ScopedLock l(monitor);
+    queue.push(msg);
+    monitor.notify();   
+}
+        
+void IncomingMessage::WaitableDestination::empty() {
+    Mutex::ScopedLock l(monitor);
+    queue.push(Empty());
+    monitor.notify();
+}
+    
+bool IncomingMessage::WaitableDestination::wait(Message& msgOut) {
+    Mutex::ScopedLock l(monitor);
+    while (queue.empty() && !shutdownFlag)
+        monitor.wait();
+    if (shutdownFlag)
+        return false;
+    Message* msg = boost::get<Message>(&queue.front());
+    bool success = msg;
+    if (success) 
+        msgOut=*msg;
+    queue.pop();
+    if (!queue.empty())
+        monitor.notify();   // Wake another waiter.
+    return success;
+}
+
+void IncomingMessage::WaitableDestination::shutdown() {
+    Mutex::ScopedLock l(monitor);
+    shutdownFlag = true;
+    monitor.notifyAll();
+}
+
 void IncomingMessage::openReference(const std::string& name) {
     Mutex::ScopedLock l(lock);
     if (references.find(name) != references.end())

Modified: incubator/qpid/trunk/qpid/cpp/src/client/IncomingMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/client/IncomingMessage.h?view=diff&rev=525282&r1=525281&r2=525282
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/client/IncomingMessage.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/client/IncomingMessage.h Tue Apr  3 14:18:17 2007
@@ -21,10 +21,11 @@
  * under the License.
  *
  */
-#include "../sys/Mutex.h"
+#include "../sys/Monitor.h"
 #include <map>
+#include <queue>
 #include <vector>
-
+#include <boost/variant.hpp>
 
 namespace qpid {
 namespace client {
@@ -59,6 +60,27 @@
         /** Notify destination of queue-empty contition */
         virtual void empty() = 0;
     };
+
+
+    /** A destination that a thread can wait on till a message arrives. */
+    class WaitableDestination : public Destination
+    {
+      public:
+        WaitableDestination();
+        void message(const Message& msg);        
+        void empty();
+        /** Wait till message() or empty() is called. True for message() */
+        bool wait(Message& msgOut);
+        void shutdown();
+
+      private:
+        struct Empty {};
+        typedef boost::variant<Message,Empty> Item;
+        sys::Monitor monitor;
+        std::queue<Item> queue;
+        bool shutdownFlag;
+    };
+ 
 
 
     /** Add a reference. Throws if already open. */