You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/04/03 23:18:19 UTC
svn commit: r525282 - in /incubator/qpid/trunk/qpid/cpp/src/client:
BasicMessageChannel.cpp BasicMessageChannel.h IncomingMessage.cpp
IncomingMessage.h
Author: aconway
Date: Tue Apr 3 14:18:17 2007
New Revision: 525282
URL: http://svn.apache.org/viewvc?view=rev&rev=525282
Log:
Moved BasicMessage::WaitableDestination to IncomingMessage::WaitableDestination so it can be shared by Basic and Message implementations.
Modified:
incubator/qpid/trunk/qpid/cpp/src/client/BasicMessageChannel.cpp
incubator/qpid/trunk/qpid/cpp/src/client/BasicMessageChannel.h
incubator/qpid/trunk/qpid/cpp/src/client/IncomingMessage.cpp
incubator/qpid/trunk/qpid/cpp/src/client/IncomingMessage.h
Modified: incubator/qpid/trunk/qpid/cpp/src/client/BasicMessageChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/client/BasicMessageChannel.cpp?view=diff&rev=525282&r1=525281&r2=525282
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/client/BasicMessageChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/client/BasicMessageChannel.cpp Tue Apr 3 14:18:17 2007
@@ -45,60 +45,10 @@
const std::string BASIC_REF("__basic_reference__");
}
-class BasicMessageChannel::WaitableDestination :
- public IncomingMessage::Destination
-{
- public:
- WaitableDestination() : shutdownFlag(false) {}
- void message(const Message& msg) {
- Mutex::ScopedLock l(monitor);
- queue.push(msg);
- monitor.notify();
- }
-
- void empty() {
- Mutex::ScopedLock l(monitor);
- queue.push(Empty());
- monitor.notify();
- }
-
- bool wait(Message& msgOut) {
- Mutex::ScopedLock l(monitor);
- while (queue.empty() && !shutdownFlag)
- monitor.wait();
- if (shutdownFlag)
- return false;
- Message* msg = boost::get<Message>(&queue.front());
- bool success = msg;
- if (success)
- msgOut=*msg;
- queue.pop();
- if (!queue.empty())
- monitor.notify(); // Wake another waiter.
- return success;
- }
-
- void shutdown() {
- Mutex::ScopedLock l(monitor);
- shutdownFlag = true;
- monitor.notifyAll();
- }
-
- private:
- struct Empty {};
- typedef boost::variant<Message,Empty> Item;
- sys::Monitor monitor;
- std::queue<Item> queue;
- bool shutdownFlag;
-};
-
-
BasicMessageChannel::BasicMessageChannel(Channel& ch)
- : channel(ch), returnsHandler(0),
- destGet(new WaitableDestination()),
- destDispatch(new WaitableDestination())
+ : channel(ch), returnsHandler(0)
{
- incoming.addDestination(BASIC_RETURN, *destDispatch);
+ incoming.addDestination(BASIC_RETURN, destDispatch);
}
void BasicMessageChannel::consume(
@@ -162,8 +112,8 @@
consumersCopy = consumers;
consumers.clear();
}
- destGet->shutdown();
- destDispatch->shutdown();
+ destGet.shutdown();
+ destDispatch.shutdown();
for (ConsumerMap::iterator i=consumersCopy.begin();
i != consumersCopy.end(); ++i)
{
@@ -181,10 +131,10 @@
Message& msg, const Queue& queue, AckMode ackMode)
{
// Prepare for incoming response
- incoming.addDestination(BASIC_GET, *destGet);
+ incoming.addDestination(BASIC_GET, destGet);
channel.send(
new BasicGetBody(channel.version, 0, queue.getName(), ackMode));
- bool got = destGet->wait(msg);
+ bool got = destGet.wait(msg);
return got;
}
@@ -273,7 +223,7 @@
}
// FIXME aconway 2007-03-23: Integrate consumer & destination
// maps.
- incoming.addDestination(tag, *destDispatch);
+ incoming.addDestination(tag, destDispatch);
return;
}
}
@@ -342,7 +292,7 @@
while(channel.isOpen()) {
try {
Message msg;
- bool gotMessge = destDispatch->wait(msg);
+ bool gotMessge = destDispatch.wait(msg);
if (gotMessge) {
if(msg.getDestination() == BASIC_RETURN) {
ReturnedMessageHandler* handler=0;
Modified: incubator/qpid/trunk/qpid/cpp/src/client/BasicMessageChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/client/BasicMessageChannel.h?view=diff&rev=525282&r1=525281&r2=525282
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/client/BasicMessageChannel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/client/BasicMessageChannel.h Tue Apr 3 14:18:17 2007
@@ -63,7 +63,6 @@
private:
- class WaitableDestination;
struct Consumer{
MessageListener* listener;
AckMode ackMode;
@@ -80,8 +79,8 @@
uint64_t incoming_size;
ConsumerMap consumers ;
ReturnedMessageHandler* returnsHandler;
- boost::scoped_ptr<WaitableDestination> destGet;
- boost::scoped_ptr<WaitableDestination> destDispatch;
+ IncomingMessage::WaitableDestination destGet;
+ IncomingMessage::WaitableDestination destDispatch;
};
}} // namespace qpid::client
Modified: incubator/qpid/trunk/qpid/cpp/src/client/IncomingMessage.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/client/IncomingMessage.cpp?view=diff&rev=525282&r1=525281&r2=525282
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/client/IncomingMessage.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/client/IncomingMessage.cpp Tue Apr 3 14:18:17 2007
@@ -32,6 +32,44 @@
IncomingMessage::Destination::~Destination() {}
+
+IncomingMessage::WaitableDestination::WaitableDestination()
+ : shutdownFlag(false) {}
+
+void IncomingMessage::WaitableDestination::message(const Message& msg) {
+ Mutex::ScopedLock l(monitor);
+ queue.push(msg);
+ monitor.notify();
+}
+
+void IncomingMessage::WaitableDestination::empty() {
+ Mutex::ScopedLock l(monitor);
+ queue.push(Empty());
+ monitor.notify();
+}
+
+bool IncomingMessage::WaitableDestination::wait(Message& msgOut) {
+ Mutex::ScopedLock l(monitor);
+ while (queue.empty() && !shutdownFlag)
+ monitor.wait();
+ if (shutdownFlag)
+ return false;
+ Message* msg = boost::get<Message>(&queue.front());
+ bool success = msg;
+ if (success)
+ msgOut=*msg;
+ queue.pop();
+ if (!queue.empty())
+ monitor.notify(); // Wake another waiter.
+ return success;
+}
+
+void IncomingMessage::WaitableDestination::shutdown() {
+ Mutex::ScopedLock l(monitor);
+ shutdownFlag = true;
+ monitor.notifyAll();
+}
+
void IncomingMessage::openReference(const std::string& name) {
Mutex::ScopedLock l(lock);
if (references.find(name) != references.end())
Modified: incubator/qpid/trunk/qpid/cpp/src/client/IncomingMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/client/IncomingMessage.h?view=diff&rev=525282&r1=525281&r2=525282
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/client/IncomingMessage.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/client/IncomingMessage.h Tue Apr 3 14:18:17 2007
@@ -21,10 +21,11 @@
* under the License.
*
*/
-#include "../sys/Mutex.h"
+#include "../sys/Monitor.h"
#include <map>
+#include <queue>
#include <vector>
-
+#include <boost/variant.hpp>
namespace qpid {
namespace client {
@@ -59,6 +60,27 @@
/** Notify destination of queue-empty contition */
virtual void empty() = 0;
};
+
+
+ /** A destination that a thread can wait on till a message arrives. */
+ class WaitableDestination : public Destination
+ {
+ public:
+ WaitableDestination();
+ void message(const Message& msg);
+ void empty();
+ /** Wait till message() or empty() is called. True for message() */
+ bool wait(Message& msgOut);
+ void shutdown();
+
+ private:
+ struct Empty {};
+ typedef boost::variant<Message,Empty> Item;
+ sys::Monitor monitor;
+ std::queue<Item> queue;
+ bool shutdownFlag;
+ };
+
/** Add a reference. Throws if already open. */