You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/02/21 20:25:47 UTC
svn commit: r510161 - in /incubator/qpid/branches/qpid.0-9/cpp: lib/broker/
lib/client/ lib/common/ lib/common/framing/ lib/common/sys/ tests/
Author: aconway
Date: Wed Feb 21 11:25:45 2007
New Revision: 510161
URL: http://svn.apache.org/viewvc?view=rev&rev=510161
Log:
Thread safety fixes for race conditions on incoming messages.
* cpp/lib/client/MessageListener.h: const correctness.
* cpp/tests/*: MessageListener const change.
* cpp/lib/broker/Content.h: Removed out-of-date FIXME comments.
* cpp/lib/client/ClientChannel.h/ .cpp():
- added locking for consumers map and other member access.
- refactored implementations of Basic get, deliver, return:
most logic now encapsulted in IncomingMessage class.
- fix channel close problems.
* cpp/lib/client/ClientMessage.h/.cpp:
- const correctness & API convenience fixes.
- getMethod/setMethod/getHeader: for new IncomingMessage
* cpp/lib/client/Connection.h/.cpp:
- Fixes to channel closure.
* cpp/lib/client/IncomingMessage.h/.cpp:
- Encapsulate *all* incoming message handling for client.
- Moved handling of BasicGetOk to IncomingMessage to fix race.
- Thread safety fixes.
* cpp/lib/client/ResponseHandler.h/.cpp:
- added getResponse for ClientChannel.
* cpp/lib/common/Exception.h:
- added missing throwSelf implementations.
- added ShutdownException as general purpose shut-down indicator.
- added EmptyException as general purpose "empty" indicator.
* cpp/lib/common/sys/Condition|Monitor|Mutex.h|.cpp:
- Condition variable abstraction extracted from Monitor for situations
where a single lock is associated with multiple conditions.
* cpp/tests/ClientChannelTest.cpp:
- Test incoming message transfer, get, consume etc.
Added:
incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/Condition.h (with props)
Modified:
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Content.h
incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.h
incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientMessage.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientMessage.h
incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/client/IncomingMessage.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/client/IncomingMessage.h
incubator/qpid/branches/qpid.0-9/cpp/lib/client/ResponseHandler.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/client/ResponseHandler.h
incubator/qpid/branches/qpid.0-9/cpp/lib/common/Exception.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/common/Exception.h
incubator/qpid/branches/qpid.0-9/cpp/lib/common/ExceptionHolder.h
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.h
incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/Monitor.h
incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/Mutex.h
incubator/qpid/branches/qpid.0-9/cpp/tests/ClientChannelTest.cpp
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Content.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Content.h?view=diff&rev=510161&r1=510160&r2=510161
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Content.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Content.h Wed Feb 21 11:25:45 2007
@@ -42,8 +42,6 @@
virtual ~Content(){}
/** Add a block of data to the content */
- // FIXME aconway 2007-02-07:
- // virtual void add(const DataBlock& data) = 0;
virtual void add(framing::AMQContentBody::shared_ptr data) = 0;
/** Total size of content in bytes */
@@ -54,8 +52,6 @@
* Subdivide blocks if necessary to ensure each block is
* <= framesize bytes long.
*/
- // FIXME aconway 2007-02-07:
- // virtual void send(SendFn send, u_int32_t framesize) = 0;
virtual void send(framing::ChannelAdapter& channel, u_int32_t framesize) = 0;
//FIXME aconway 2007-02-07: This is inconsistently implemented
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.cpp?view=diff&rev=510161&r1=510160&r2=510161
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.cpp Wed Feb 21 11:25:45 2007
@@ -18,6 +18,7 @@
* under the License.
*
*/
+#include <iostream>
#include <ClientChannel.h>
#include <sys/Monitor.h>
#include <ClientMessage.h>
@@ -29,17 +30,14 @@
// handling of errors that should close the connection or the channel.
// Make sure the user thread receives a connection in each case.
//
-
-using namespace boost; //to use dynamic_pointer_cast
+using namespace std;
+using namespace boost;
using namespace qpid::client;
using namespace qpid::framing;
using namespace qpid::sys;
-const std::string Channel::OK("OK");
-
Channel::Channel(bool _transactional, u_int16_t _prefetch) :
connection(0),
- incoming(0),
prefetch(_prefetch),
transactional(_transactional)
{ }
@@ -106,8 +104,8 @@
ConnectionRedirectBody::shared_ptr redirect(
shared_polymorphic_downcast<ConnectionRedirectBody>(
responses.getResponse()));
- std::cout << "Received redirection to " << redirect->getHost()
- << std::endl;
+ cout << "Received redirection to " << redirect->getHost()
+ << endl;
} else {
THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response");
}
@@ -183,11 +181,11 @@
Queue& queue, std::string& tag, MessageListener* listener,
int ackMode, bool noLocal, bool synch, const FieldTable* fields)
{
- string q = queue.getName();
sendAndReceiveSync<BasicConsumeOkBody>(
synch,
new BasicConsumeBody(
- version, 0, q, tag, noLocal, ackMode == NO_ACK, false, !synch,
+ version, 0, queue.getName(), tag, noLocal,
+ ackMode == NO_ACK, false, !synch,
fields ? *fields : FieldTable()));
if (synch) {
BasicConsumeOkBody::shared_ptr response =
@@ -195,90 +193,78 @@
responses.getResponse());
tag = response->getConsumerTag();
}
- Consumer& c = consumers[tag];
- c.listener = listener;
- c.ackMode = ackMode;
- c.lastDeliveryTag = 0;
+ // FIXME aconway 2007-02-20: Race condition!
+ // We could receive the first message for the consumer
+ // before we create the consumer below.
+ // Move consumer creation to handler for BasicConsumeOkBody
+ {
+ Mutex::ScopedLock l(lock);
+ ConsumerMap::iterator i = consumers.find(tag);
+ if (i != consumers.end())
+ THROW_QPID_ERROR(CLIENT_ERROR,
+ "Consumer already exists with tag="+tag);
+ Consumer& c = consumers[tag];
+ c.listener = listener;
+ c.ackMode = ackMode;
+ c.lastDeliveryTag = 0;
+ }
}
void Channel::cancel(const std::string& tag, bool synch) {
- ConsumerMap::iterator i = consumers.find(tag);
- if (i != consumers.end()) {
- Consumer& c = i->second;
- if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0)
- send(new BasicAckBody(version, c.lastDeliveryTag, true));
- sendAndReceiveSync<BasicCancelOkBody>(
- synch, new BasicCancelBody(version, tag, !synch));
- consumers.erase(tag);
- }
+ Consumer c;
+ {
+ Mutex::ScopedLock l(lock);
+ ConsumerMap::iterator i = consumers.find(tag);
+ if (i == consumers.end())
+ return;
+ c = i->second;
+ consumers.erase(i);
+ }
+ if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0)
+ send(new BasicAckBody(version, c.lastDeliveryTag, true));
+ sendAndReceiveSync<BasicCancelOkBody>(
+ synch, new BasicCancelBody(version, tag, !synch));
}
void Channel::cancelAll(){
- while(!consumers.empty()) {
- Consumer c = consumers.begin()->second;
- consumers.erase(consumers.begin());
+ ConsumerMap consumersCopy;
+ {
+ Mutex::ScopedLock l(lock);
+ consumersCopy = consumers;
+ consumers.clear();
+ }
+ for (ConsumerMap::iterator i=consumersCopy.begin();
+ i != consumersCopy.end(); ++i)
+ {
+ Consumer& c = i->second;
if ((c.ackMode == LAZY_ACK || c.ackMode == AUTO_ACK)
&& c.lastDeliveryTag > 0)
{
- // Let exceptions propagate, if one fails no point
- // trying the rest. NB no memory leaks if we do,
- // ConsumerMap holds values, not pointers.
- //
send(new BasicAckBody(version, c.lastDeliveryTag, true));
}
}
}
-void Channel::retrieve(Message& msg){
- Monitor::ScopedLock l(retrievalMonitor);
- while(retrieved == 0){
- retrievalMonitor.wait();
- }
-
- msg.header = retrieved->getHeader();
- msg.deliveryTag = retrieved->getDeliveryTag();
- msg.data = retrieved->getData();
- delete retrieved;
- retrieved = 0;
-}
-
bool Channel::get(Message& msg, const Queue& queue, int ackMode) {
- string name = queue.getName();
- responses.expect();
- send(new BasicGetBody(version, 0, name, ackMode));
- responses.waitForResponse();
- AMQMethodBody::shared_ptr response = responses.getResponse();
- if(response->isA<BasicGetOkBody>()) {
- if(incoming != 0){
- std::cout << "Existing message not complete" << std::endl;
- // FIXME aconway 2007-01-26: close the connection? the channel?
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
- }else{
- incoming = new IncomingMessage(dynamic_pointer_cast<BasicGetOkBody, AMQMethodBody>(response));
- }
- retrieve(msg);
- return true;
- }if(response->isA<BasicGetEmptyBody>()){
- return false;
- }else{
- // FIXME aconway 2007-01-26: must close the connection.
- THROW_QPID_ERROR(PROTOCOL_ERROR+504, "Unexpected frame");
- }
+ // Expect a message starting with a BasicGetOk
+ incoming.startGet();
+ send(new BasicGetBody(version, 0, queue.getName(), ackMode));
+ return incoming.waitGet(msg);
}
-void Channel::publish(Message& msg, const Exchange& exchange, const std::string& routingKey, bool mandatory, bool immediate){
- // FIXME aconway 2007-01-30: Rework for message class.
-
- string e = exchange.getName();
+void Channel::publish(
+ const Message& msg, const Exchange& exchange,
+ const std::string& routingKey, bool mandatory, bool immediate)
+{
+ // FIXME aconway 2007-01-30: Rework for 0-9 message class.
+ const string e = exchange.getName();
string key = routingKey;
send(new BasicPublishBody(version, 0, e, key, mandatory, immediate));
//break msg up into header frame and content frame(s) and send these
- string data = msg.getData();
- msg.header->setContentSize(data.length());
send(msg.header);
-
+ string data = msg.getData();
u_int64_t data_length = data.length();
if(data_length > 0){
u_int32_t frag_size = connection->getMaxFrameSize() - 8;//frame itself uses 8 bytes
@@ -312,30 +298,30 @@
{
//channel.flow, channel.close, basic.deliver, basic.return or a
//response to a synchronous request
- if(responses.isWaiting()){
+ if(responses.isWaiting()) {
responses.signalResponse(body);
- }else if(body->isA<BasicDeliverBody>()) {
- if(incoming != 0){
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
- std::cout << "Existing message not complete [deliveryTag=" << incoming->getDeliveryTag() << "]" << std::endl;
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
- }else{
- incoming = new IncomingMessage(dynamic_pointer_cast<BasicDeliverBody, AMQMethodBody>(body));
- }
- }else if(body->isA<BasicReturnBody>()){
- if(incoming != 0){
- std::cout << "Existing message not complete" << std::endl;
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
- }else{
- incoming = new IncomingMessage(dynamic_pointer_cast<BasicReturnBody, AMQMethodBody>(body));
- }
- }else if(body->isA<ChannelCloseBody>()){
+ return;
+ }
+
+ if(body->isA<BasicDeliverBody>()
+ || body->isA<BasicReturnBody>()
+ || body->isA<BasicGetOkBody>()
+ || body->isA<BasicGetEmptyBody>())
+
+ {
+ incoming.add(body);
+ return;
+ }
+ else if(body->isA<ChannelCloseBody>()) {
peerClose(shared_polymorphic_downcast<ChannelCloseBody>(body));
- }else if(body->isA<ChannelFlowBody>()){
- // TODO aconway 2007-01-24:
- }else if(body->isA<ConnectionCloseBody>()){
+ }
+ else if(body->isA<ChannelFlowBody>()){
+ // TODO aconway 2007-01-24: not implemented yet.
+ }
+ else if(body->isA<ConnectionCloseBody>()){
connection->close();
- }else{
+ }
+ else {
connection->close(
504, "Unrecognised method",
body->amqpClassId(), body->amqpMethodId());
@@ -343,31 +329,13 @@
}
void Channel::handleHeader(AMQHeaderBody::shared_ptr body){
- if(incoming == 0){
- //handle invalid frame sequence
- std::cout << "Invalid message sequence: got header before return or deliver." << std::endl;
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got header before return or deliver.");
- }else{
- incoming->setHeader(body);
- if(incoming->isComplete()){
- enqueue();
- }
- }
+ incoming.add(body);
}
void Channel::handleContent(AMQContentBody::shared_ptr body){
- if(incoming == 0){
- //handle invalid frame sequence
- std::cout << "Invalid message sequence: got content before return or deliver." << std::endl;
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got content before return or deliver.");
- }else{
- incoming->addContent(body);
- if(incoming->isComplete()){
- enqueue();
- }
- }
+ incoming.add(body);
}
-
+
void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){
THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Channel received heartbeat");
}
@@ -376,35 +344,6 @@
dispatcher = Thread(this);
}
-void Channel::run(){
- dispatch();
-}
-
-void Channel::enqueue(){
- Monitor::ScopedLock l(retrievalMonitor);
- if(incoming->isResponse()){
- retrieved = incoming;
- retrievalMonitor.notify();
- }else{
- messages.push(incoming);
- dispatchMonitor.notify();
- }
- incoming = 0;
-}
-
-IncomingMessage* Channel::dequeue(){
- Monitor::ScopedLock l(dispatchMonitor);
- while(messages.empty() && isOpen()){
- dispatchMonitor.wait();
- }
- IncomingMessage* msg = 0;
- if(!messages.empty()){
- msg = messages.front();
- messages.pop();
- }
- return msg;
-}
-
void Channel::deliver(Consumer& consumer, Message& msg){
//record delivery tag:
consumer.lastDeliveryTag = msg.getDeliveryTag();
@@ -412,8 +351,6 @@
//allow registered listener to handle the message
consumer.listener->received(msg);
- //if the handler calls close on the channel or connection while
- //handling this message, then consumer will now have been deleted.
if(isOpen()){
bool multiple(false);
switch(consumer.ackMode){
@@ -432,35 +369,53 @@
//a transaction until it commits.
}
-void Channel::dispatch(){
- while(isOpen()){
- IncomingMessage* incomingMsg = dequeue();
- if(incomingMsg){
- //Note: msg is currently only valid for duration of this call
- Message msg(incomingMsg->getHeader());
- msg.data = incomingMsg->getData();
- if(incomingMsg->isReturn()){
- if(returnsHandler == 0){
- //print warning to log/console
- std::cout << "Message returned: " << msg.getData() << std::endl;
- }else{
- returnsHandler->returned(msg);
+void Channel::run() {
+ while(isOpen()) {
+ try {
+ Message msg = incoming.waitDispatch();
+ if(msg.getMethod()->isA<BasicReturnBody>()) {
+ ReturnedMessageHandler* handler=0;
+ {
+ Mutex::ScopedLock l(lock);
+ handler=returnsHandler;
}
- }else{
- msg.deliveryTag = incomingMsg->getDeliveryTag();
- std::string tag = incomingMsg->getConsumerTag();
-
- if(consumers.find(tag) == consumers.end())
- std::cout << "Unknown consumer: " << tag << std::endl;
- else
- deliver(consumers[tag], msg);
+ if(handler == 0) {
+ // TODO aconway 2007-02-20: proper logging.
+ cout << "Message returned: " << msg.getData() << endl;
+ }
+ else
+ handler->returned(msg);
+ }
+ else {
+ BasicDeliverBody::shared_ptr deliverBody =
+ boost::shared_polymorphic_downcast<BasicDeliverBody>(
+ msg.getMethod());
+ std::string tag = deliverBody->getConsumerTag();
+ Consumer consumer;
+ {
+ Mutex::ScopedLock l(lock);
+ ConsumerMap::iterator i = consumers.find(tag);
+ if(i == consumers.end())
+ THROW_QPID_ERROR(PROTOCOL_ERROR+504,
+ "Unknown consumer tag=" + tag);
+ consumer = i->second;
+ }
+ deliver(consumer, msg);
}
- delete incomingMsg;
+ }
+ catch (const ShutdownException&) {
+ /* Orderly shutdown */
+ }
+ catch (const Exception& e) {
+ // FIXME aconway 2007-02-20: Report exception to user.
+ cout << "client::Channel::run() terminated by: " << e.toString()
+ << "(" << typeid(e).name() << ")" << endl;
}
}
}
void Channel::setReturnedMessageHandler(ReturnedMessageHandler* handler){
+ Mutex::ScopedLock l(lock);
returnsHandler = handler;
}
@@ -469,13 +424,17 @@
u_int16_t code, const std::string& text,
ClassId classId, MethodId methodId)
{
- if (getId() != 0 && isOpen()) {
+ if (isOpen()) {
try {
- sendAndReceive<ChannelCloseOkBody>(
- new ChannelCloseBody(version, code, text, classId, methodId));
- cancelAll();
+ if (getId() != 0) {
+ sendAndReceive<ChannelCloseOkBody>(
+ new ChannelCloseBody(
+ version, code, text, classId, methodId));
+ }
+ static_cast<ConnectionForChannel*>(connection)->erase(getId());
closeInternal();
} catch (...) {
+ static_cast<ConnectionForChannel*>(connection)->erase(getId());
closeInternal();
throw;
}
@@ -491,14 +450,13 @@
}
void Channel::closeInternal() {
- assert(isOpen());
+ if (isOpen());
{
- Monitor::ScopedLock l(dispatchMonitor);
- static_cast<ConnectionForChannel*>(connection)->erase(getId());
+ cancelAll();
+ incoming.shutdown();
connection = 0;
// A 0 response means we are closed.
responses.signalResponse(AMQMethodBody::shared_ptr());
- dispatchMonitor.notify();
}
dispatcher.join();
}
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.h?view=diff&rev=510161&r1=510160&r2=510161
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientChannel.h Wed Feb 21 11:25:45 2007
@@ -89,19 +89,12 @@
u_int64_t lastDeliveryTag;
};
typedef std::map<std::string, Consumer> ConsumerMap;
- typedef std::queue<boost::shared_ptr<framing::AMQMethodBody> > IncomingMethods;
- static const std::string OK;
-
+ sys::Mutex lock;
Connection* connection;
sys::Thread dispatcher;
- IncomingMethods incomingMethods;
- IncomingMessage* incoming;
+ IncomingMessage incoming;
ResponseHandler responses;
- std::queue<IncomingMessage*> messages;//holds returned messages or those delivered for a consume
- IncomingMessage* retrieved;//holds response to basic.get
- sys::Monitor dispatchMonitor;
- sys::Monitor retrievalMonitor;
ConsumerMap consumers;
ReturnedMessageHandler* returnsHandler;
@@ -109,10 +102,7 @@
const bool transactional;
framing::ProtocolVersion version;
- void enqueue();
void retrieve(Message& msg);
- IncomingMessage* dequeue();
- void dispatch();
void deliver(Consumer& consumer, Message& msg);
void handleHeader(framing::AMQHeaderBody::shared_ptr body);
@@ -307,7 +297,8 @@
* receive this message on publication, the message will be
* returned (see setReturnedMessageHandler()).
*/
- void publish(Message& msg, const Exchange& exchange, const std::string& routingKey,
+ void publish(const Message& msg, const Exchange& exchange,
+ const std::string& routingKey,
bool mandatory = false, bool immediate = false);
/**
@@ -352,8 +343,8 @@
* Closing a channel that is not open has no effect.
*/
void close(
- framing::ReplyCode = 200, const std::string& =OK,
- framing::ClassId = 0, framing::MethodId = 0);
+ framing::ReplyCode = 200, const std::string& ="OK",
+ framing::ClassId = 0, framing::MethodId = 0);
/**
* Set a handler for this channel that will process any
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientMessage.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientMessage.cpp?view=diff&rev=510161&r1=510160&r2=510161
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientMessage.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientMessage.cpp Wed Feb 21 11:25:45 2007
@@ -19,7 +19,6 @@
*
*/
#include <ClientMessage.h>
-
using namespace qpid::client;
using namespace qpid::framing;
@@ -40,63 +39,63 @@
Message::~Message(){
}
-BasicHeaderProperties* Message::getHeaderProperties(){
+BasicHeaderProperties* Message::getHeaderProperties() const {
return dynamic_cast<BasicHeaderProperties*>(header->getProperties());
}
-const std::string& Message::getContentType(){
+const std::string& Message::getContentType() const {
return getHeaderProperties()->getContentType();
}
-const std::string& Message::getContentEncoding(){
+const std::string& Message::getContentEncoding() const {
return getHeaderProperties()->getContentEncoding();
}
-FieldTable& Message::getHeaders(){
+FieldTable& Message::getHeaders() const {
return getHeaderProperties()->getHeaders();
}
-u_int8_t Message::getDeliveryMode(){
+u_int8_t Message::getDeliveryMode() const {
return getHeaderProperties()->getDeliveryMode();
}
-u_int8_t Message::getPriority(){
+u_int8_t Message::getPriority() const {
return getHeaderProperties()->getPriority();
}
-const std::string& Message::getCorrelationId(){
+const std::string& Message::getCorrelationId() const {
return getHeaderProperties()->getCorrelationId();
}
-const std::string& Message::getReplyTo(){
+const std::string& Message::getReplyTo() const {
return getHeaderProperties()->getReplyTo();
}
-const std::string& Message::getExpiration(){
+const std::string& Message::getExpiration() const {
return getHeaderProperties()->getExpiration();
}
-const std::string& Message::getMessageId(){
+const std::string& Message::getMessageId() const {
return getHeaderProperties()->getMessageId();
}
-u_int64_t Message::getTimestamp(){
+u_int64_t Message::getTimestamp() const {
return getHeaderProperties()->getTimestamp();
}
-const std::string& Message::getType(){
+const std::string& Message::getType() const {
return getHeaderProperties()->getType();
}
-const std::string& Message::getUserId(){
+const std::string& Message::getUserId() const {
return getHeaderProperties()->getUserId();
}
-const std::string& Message::getAppId(){
+const std::string& Message::getAppId() const {
return getHeaderProperties()->getAppId();
}
-const std::string& Message::getClusterId(){
+const std::string& Message::getClusterId() const {
return getHeaderProperties()->getClusterId();
}
@@ -154,4 +153,10 @@
void Message::setClusterId(const std::string& clusterId){
getHeaderProperties()->setClusterId(clusterId);
+}
+
+
+u_int64_t Message::getDeliveryTag() const {
+ BasicDeliverBody* deliver=dynamic_cast<BasicDeliverBody*>(method.get());
+ return deliver ? deliver->getDeliveryTag() : 0;
}
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientMessage.h?view=diff&rev=510161&r1=510160&r2=510161
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientMessage.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/ClientMessage.h Wed Feb 21 11:25:45 2007
@@ -25,89 +25,99 @@
#include <framing/amqp_framing.h>
namespace qpid {
+
namespace client {
+class IncomingMessage;
+
+/**
+ * A representation of messages for sent or recived through the
+ * client api.
+ *
+ * \ingroup clientapi
+ */
+class Message {
+ framing::AMQMethodBody::shared_ptr method;
+ framing::AMQHeaderBody::shared_ptr header;
+ std::string data;
+ bool redelivered;
+
+ // FIXME aconway 2007-02-20: const incorrect, needs const return type.
+ framing::BasicHeaderProperties* getHeaderProperties() const;
+ Message(qpid::framing::AMQHeaderBody::shared_ptr& header);
+
+ public:
+ Message(const std::string& data=std::string());
+ ~Message();
+
+ /**
+ * Allows the application to access the content of messages
+ * received.
+ *
+ * @return a string representing the data of the message
+ */
+ std::string getData() const { return data; }
+
+ /**
+ * Allows the application to set the content of messages to be
+ * sent.
+ *
+ * @param data a string representing the data of the message
+ */
+ void setData(const std::string& _data);
/**
- * A representation of messages for sent or recived through the
- * client api.
- *
- * \ingroup clientapi
+ * @return true if this message was delivered previously (to
+ * any consumer) but was not acknowledged.
*/
- class Message{
- qpid::framing::AMQHeaderBody::shared_ptr header;
- std::string data;
- bool redelivered;
- u_int64_t deliveryTag;
+ bool isRedelivered(){ return redelivered; }
+ void setRedelivered(bool _redelivered){ redelivered = _redelivered; }
- qpid::framing::BasicHeaderProperties* getHeaderProperties();
- Message(qpid::framing::AMQHeaderBody::shared_ptr& header);
+ u_int64_t getDeliveryTag() const;
+
+ const std::string& getContentType() const;
+ const std::string& getContentEncoding() const;
+ qpid::framing::FieldTable& getHeaders() const;
+ u_int8_t getDeliveryMode() const;
+ u_int8_t getPriority() const;
+ const std::string& getCorrelationId() const;
+ const std::string& getReplyTo() const;
+ const std::string& getExpiration() const;
+ const std::string& getMessageId() const;
+ u_int64_t getTimestamp() const;
+ const std::string& getType() const;
+ const std::string& getUserId() const;
+ const std::string& getAppId() const;
+ const std::string& getClusterId() const;
+
+ void setContentType(const std::string& type);
+ void setContentEncoding(const std::string& encoding);
+ void setHeaders(const qpid::framing::FieldTable& headers);
+ /**
+ * Sets the delivery mode. 1 = non-durable, 2 = durable.
+ */
+ void setDeliveryMode(u_int8_t mode);
+ void setPriority(u_int8_t priority);
+ void setCorrelationId(const std::string& correlationId);
+ void setReplyTo(const std::string& replyTo);
+ void setExpiration(const std::string& expiration);
+ void setMessageId(const std::string& messageId);
+ void setTimestamp(u_int64_t timestamp);
+ void setType(const std::string& type);
+ void setUserId(const std::string& userId);
+ void setAppId(const std::string& appId);
+ void setClusterId(const std::string& clusterId);
+
+ /** Get the method used to deliver this message */
+ boost::shared_ptr<framing::AMQMethodBody> getMethod() const
+ { return method; }
- public:
- Message(const std::string& data=std::string());
- ~Message();
-
- /**
- * Allows the application to access the content of messages
- * received.
- *
- * @return a string representing the data of the message
- */
- std::string getData() const { return data; }
-
- /**
- * Allows the application to set the content of messages to be
- * sent.
- *
- * @param data a string representing the data of the message
- */
- void setData(const std::string& _data);
-
- /**
- * @return true if this message was delivered previously (to
- * any consumer) but was not acknowledged.
- */
- inline bool isRedelivered(){ return redelivered; }
- inline void setRedelivered(bool _redelivered){ redelivered = _redelivered; }
-
- inline u_int64_t getDeliveryTag(){ return deliveryTag; }
-
- const std::string& getContentType();
- const std::string& getContentEncoding();
- qpid::framing::FieldTable& getHeaders();
- u_int8_t getDeliveryMode();
- u_int8_t getPriority();
- const std::string& getCorrelationId();
- const std::string& getReplyTo();
- const std::string& getExpiration();
- const std::string& getMessageId();
- u_int64_t getTimestamp();
- const std::string& getType();
- const std::string& getUserId();
- const std::string& getAppId();
- const std::string& getClusterId();
-
- void setContentType(const std::string& type);
- void setContentEncoding(const std::string& encoding);
- void setHeaders(const qpid::framing::FieldTable& headers);
- /**
- * Sets the delivery mode. 1 = non-durable, 2 = durable.
- */
- void setDeliveryMode(u_int8_t mode);
- void setPriority(u_int8_t priority);
- void setCorrelationId(const std::string& correlationId);
- void setReplyTo(const std::string& replyTo);
- void setExpiration(const std::string& expiration);
- void setMessageId(const std::string& messageId);
- void setTimestamp(u_int64_t timestamp);
- void setType(const std::string& type);
- void setUserId(const std::string& userId);
- void setAppId(const std::string& appId);
- void setClusterId(const std::string& clusterId);
-
-
- // TODO aconway 2007-02-15: remove friendships.
- friend class Channel;
- };
+ void setMethod(framing::AMQMethodBody::shared_ptr m) { method=m; }
+ boost::shared_ptr<framing::AMQHeaderBody> getHeader();
+
+ // TODO aconway 2007-02-15: remove friendships.
+ friend class IncomingMessage;
+ friend class Channel;
+};
}}
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.cpp?view=diff&rev=510161&r1=510160&r2=510161
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.cpp Wed Feb 21 11:25:45 2007
@@ -18,7 +18,9 @@
* under the License.
*
*/
+#include <algorithm>
#include <boost/format.hpp>
+#include <boost/bind.hpp>
#include <Connection.h>
#include <ClientChannel.h>
@@ -27,7 +29,6 @@
#include <iostream>
#include <sstream>
#include <MethodBodyInstances.h>
-#include <boost/bind.hpp>
#include <functional>
using namespace qpid::framing;
@@ -83,15 +84,17 @@
{
if(isOpen) {
// TODO aconway 2007-01-29: Exception handling - could end up
- // partly closed.
+ // partly closed with threads left unjoined.
isOpen = false;
channel0.sendAndReceive<ConnectionCloseOkBody>(
new ConnectionCloseBody(
getVersion(), code, msg, classId, methodId));
- while(!channels.empty()) {
- channels.begin()->second->close();
- channels.erase(channels.begin());
- }
+
+ using boost::bind;
+ for_each(channels.begin(), channels.end(),
+ bind(&Channel::closeInternal,
+ bind(&ChannelMap::value_type::second, _1)));
+ channels.clear();
connector->close();
}
}
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/IncomingMessage.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/IncomingMessage.cpp?view=diff&rev=510161&r1=510160&r2=510161
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/IncomingMessage.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/IncomingMessage.cpp Wed Feb 21 11:25:45 2007
@@ -19,58 +19,154 @@
*
*/
#include <IncomingMessage.h>
+#include "framing/AMQHeaderBody.h"
+#include "framing/AMQContentBody.h"
+#include "BasicGetOkBody.h"
+#include "BasicReturnBody.h"
+#include "BasicDeliverBody.h"
#include <QpidError.h>
#include <iostream>
-using namespace qpid::client;
-using namespace qpid::framing;
+namespace qpid {
+namespace client {
-IncomingMessage::IncomingMessage(BasicDeliverBody::shared_ptr intro) : delivered(intro){}
-IncomingMessage::IncomingMessage(BasicReturnBody::shared_ptr intro): returned(intro){}
-IncomingMessage::IncomingMessage(BasicGetOkBody::shared_ptr intro): response(intro){}
+using namespace sys;
+using namespace framing;
-IncomingMessage::~IncomingMessage(){
-}
-
-void IncomingMessage::setHeader(AMQHeaderBody::shared_ptr _header){
- this->header = _header;
-}
-
-void IncomingMessage::addContent(AMQContentBody::shared_ptr content){
- data.append(content->getData());
-}
-
-bool IncomingMessage::isComplete(){
- return header != 0 && header->getContentSize() == data.size();
-}
-
-bool IncomingMessage::isReturn(){
- return returned;
-}
-
-bool IncomingMessage::isDelivery(){
- return delivered;
-}
-
-bool IncomingMessage::isResponse(){
- return response;
-}
-
-const string& IncomingMessage::getConsumerTag(){
- if(!isDelivery()) THROW_QPID_ERROR(CLIENT_ERROR, "Consumer tag only valid for delivery");
- return delivered->getConsumerTag();
-}
-
-u_int64_t IncomingMessage::getDeliveryTag(){
- if(!isDelivery()) THROW_QPID_ERROR(CLIENT_ERROR, "Delivery tag only valid for delivery");
- return delivered->getDeliveryTag();
-}
-
-AMQHeaderBody::shared_ptr& IncomingMessage::getHeader(){
- return header;
-}
-
-std::string IncomingMessage::getData() const {
- return data;
+struct IncomingMessage::Guard: public Mutex::ScopedLock {
+ Guard(IncomingMessage* im) : Mutex::ScopedLock(im->lock) {
+ im->shutdownError.throwIf();
+ }
+};
+
+IncomingMessage::IncomingMessage() { reset(); }
+
+void IncomingMessage::reset() {
+ state = &IncomingMessage::expectRequest;
+ endFn= &IncomingMessage::endRequest;
+ buildMessage = Message();
+}
+
+void IncomingMessage::startGet() {
+ Guard g(this);
+ if (state != &IncomingMessage::expectRequest) {
+ endGet(new QPID_ERROR(CLIENT_ERROR, "Message already in progress."));
+ }
+ else {
+ state = &IncomingMessage::expectGetOk;
+ endFn = &IncomingMessage::endGet;
+ getError.reset();
+ getState = GETTING;
+ }
+}
+
+bool IncomingMessage::waitGet(Message& msg) {
+ Guard g(this);
+ while (getState == GETTING && !shutdownError && !getError)
+ getReady.wait(lock);
+ shutdownError.throwIf();
+ getError.throwIf();
+ msg = getMessage;
+ return getState==GOT;
+}
+
+Message IncomingMessage::waitDispatch() {
+ Guard g(this);
+ while(dispatchQueue.empty() && !shutdownError)
+ dispatchReady.wait(lock);
+ shutdownError.throwIf();
+
+ Message msg(dispatchQueue.front());
+ dispatchQueue.pop();
+ return msg;
+}
+
+void IncomingMessage::add(BodyPtr body) {
+ Guard g(this);
+ shutdownError.throwIf();
+ // Call the current state function.
+ (this->*state)(body);
+}
+
+void IncomingMessage::shutdown() {
+ Mutex::ScopedLock l(lock);
+ shutdownError.reset(new ShutdownException());
+ getReady.notify();
+ dispatchReady.notify();
+}
+
+bool IncomingMessage::isShutdown() const {
+ Mutex::ScopedLock l(lock);
+ return shutdownError;
+}
+
+// Common check for all the expect functions. Called in network thread.
+template<class T>
+boost::shared_ptr<T> IncomingMessage::expectCheck(BodyPtr body) {
+ boost::shared_ptr<T> ptr = boost::dynamic_pointer_cast<T>(body);
+ if (!ptr)
+ throw QPID_ERROR(PROTOCOL_ERROR+504, "Unexpected frame type");
+ return ptr;
+}
+
+void IncomingMessage::expectGetOk(BodyPtr body) {
+ if (dynamic_cast<BasicGetOkBody*>(body.get()))
+ state = &IncomingMessage::expectHeader;
+ else if (dynamic_cast<BasicGetEmptyBody*>(body.get())) {
+ getState = EMPTY;
+ endGet();
+ }
+ else
+ throw QPID_ERROR(PROTOCOL_ERROR+504, "Unexpected frame type");
+}
+
+void IncomingMessage::expectHeader(BodyPtr body) {
+ AMQHeaderBody::shared_ptr header = expectCheck<AMQHeaderBody>(body);
+ buildMessage.header = header;
+ state = &IncomingMessage::expectContent;
+ checkComplete();
+}
+
+void IncomingMessage::expectContent(BodyPtr body) {
+ AMQContentBody::shared_ptr content = expectCheck<AMQContentBody>(body);
+ buildMessage.setData(buildMessage.getData() + content->getData());
+ checkComplete();
+}
+
+void IncomingMessage::checkComplete() {
+ size_t declaredSize = buildMessage.header->getContentSize();
+ size_t currentSize = buildMessage.getData().size();
+ if (declaredSize == currentSize)
+ (this->*endFn)(0);
+ else if (declaredSize < currentSize)
+ (this->*endFn)(new QPID_ERROR(
+ PROTOCOL_ERROR, "Message content exceeds declared size."));
+}
+
+void IncomingMessage::expectRequest(BodyPtr body) {
+ AMQMethodBody::shared_ptr method = expectCheck<AMQMethodBody>(body);
+ buildMessage.setMethod(method);
+ state = &IncomingMessage::expectHeader;
+}
+
+void IncomingMessage::endGet(Exception* ex) {
+ getError.reset(ex);
+ if (getState == GETTING) {
+ getMessage = buildMessage;
+ getState = GOT;
+ }
+ reset();
+ getReady.notify();
+}
+
+void IncomingMessage::endRequest(Exception* ex) {
+ ExceptionHolder eh(ex);
+ if (!eh) {
+ dispatchQueue.push(buildMessage);
+ reset();
+ dispatchReady.notify();
+ }
+ eh.throwIf();
}
+}} // namespace qpid::client
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/IncomingMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/IncomingMessage.h?view=diff&rev=510161&r1=510160&r2=510161
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/IncomingMessage.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/IncomingMessage.h Wed Feb 21 11:25:45 2007
@@ -1,3 +1,6 @@
+#ifndef _IncomingMessage_
+#define _IncomingMessage_
+
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -19,43 +22,97 @@
*
*/
#include <string>
-#include <vector>
+#include <queue>
#include <framing/amqp_framing.h>
+#include "ExceptionHolder.h"
+#include "ClientMessage.h"
+#include "sys/Mutex.h"
+#include "sys/Condition.h"
-#ifndef _IncomingMessage_
-#define _IncomingMessage_
+namespace qpid {
-#include <ClientMessage.h>
+namespace framing {
+class AMQBody;
+}
-namespace qpid {
namespace client {
+/**
+ * Accumulates incoming message frames into messages.
+ * Client-initiated messages (basic.get) are initiated and made
+ * available to the user thread one at a time.
+ *
+ * Broker initiated messages (basic.return, basic.deliver) are
+ * queued for handling by the user dispatch thread.
+ */
+class IncomingMessage {
+ public:
+ typedef boost::shared_ptr<framing::AMQBody> BodyPtr;
+ IncomingMessage();
+
+ /** Expect a new message starting with getOk. Called in user thread.*/
+ void startGet();
- class IncomingMessage{
- //content will be preceded by one of these method frames
- qpid::framing::BasicDeliverBody::shared_ptr delivered;
- qpid::framing::BasicReturnBody::shared_ptr returned;
- qpid::framing::BasicGetOkBody::shared_ptr response;
- qpid::framing::AMQHeaderBody::shared_ptr header;
- std::string data;
- public:
- IncomingMessage(qpid::framing::BasicDeliverBody::shared_ptr intro);
- IncomingMessage(qpid::framing::BasicReturnBody::shared_ptr intro);
- IncomingMessage(qpid::framing::BasicGetOkBody::shared_ptr intro);
- ~IncomingMessage();
- void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header);
- void addContent(qpid::framing::AMQContentBody::shared_ptr content);
- bool isComplete();
- bool isReturn();
- bool isDelivery();
- bool isResponse();
- const std::string& getConsumerTag();//only relevant if isDelivery()
- qpid::framing::AMQHeaderBody::shared_ptr& getHeader();
- u_int64_t getDeliveryTag();
- std::string getData() const;
- };
+ /** Wait for the message to complete, return the message.
+ * Called in user thread.
+ *@raises QpidError if there was an error.
+ */
+ bool waitGet(Message&);
-}
-}
+ /** Wait for the next broker-initiated message. */
+ Message waitDispatch();
+
+ /** Add a frame body to the message. Called in network thread. */
+ void add(BodyPtr);
+
+ /** Shut down: all further calls to any function throw ex. */
+ void shutdown();
+
+ /** Check if shutdown */
+ bool isShutdown() const;
+
+ private:
+
+ typedef void (IncomingMessage::* ExpectFn)(BodyPtr);
+ typedef void (IncomingMessage::* EndFn)(Exception*);
+ typedef std::queue<Message> MessageQueue;
+ struct Guard;
+ friend struct Guard;
+
+ void reset();
+ template <class T> boost::shared_ptr<T> expectCheck(BodyPtr);
+
+ // State functions - a state machine where each state is
+ // a member function that processes a frame body.
+ void expectGetOk(BodyPtr);
+ void expectHeader(BodyPtr);
+ void expectContent(BodyPtr);
+ void expectRequest(BodyPtr);
+
+ // End functions.
+ void endGet(Exception* ex = 0);
+ void endRequest(Exception* ex);
+
+ // Check for complete message.
+ void checkComplete();
+
+ mutable sys::Mutex lock;
+ ExpectFn state;
+ EndFn endFn;
+ Message buildMessage;
+ ExceptionHolder shutdownError;
+
+ // For basic.get messages.
+ sys::Condition getReady;
+ ExceptionHolder getError;
+ Message getMessage;
+ enum { GETTING, GOT, EMPTY } getState;
+
+ // For broker-initiated messages
+ sys::Condition dispatchReady;
+ MessageQueue dispatchQueue;
+};
+
+}}
#endif
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/ResponseHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/ResponseHandler.cpp?view=diff&rev=510161&r1=510160&r2=510161
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/ResponseHandler.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/ResponseHandler.cpp Wed Feb 21 11:25:45 2007
@@ -59,16 +59,20 @@
Monitor::ScopedLock l(monitor);
while (waiting)
monitor.wait();
- if (!response) {
- THROW_QPID_ERROR(
- PROTOCOL_ERROR, "Channel closed unexpectedly.");
- }
- if(!validate(response->amqpClassId(), response->amqpMethodId())) {
+ getResponse(); // Check for closed.
+ if(!validate(response->amqpClassId(), response->amqpMethodId())) {
THROW_QPID_ERROR(
PROTOCOL_ERROR,
boost::format("Expected class:method %d:%d, got %d:%d")
% c % m % response->amqpClassId() % response->amqpMethodId());
}
+}
+
+framing::AMQMethodBody::shared_ptr ResponseHandler::getResponse() {
+ if (!response)
+ THROW_QPID_ERROR(
+ PROTOCOL_ERROR, "Channel closed unexpectedly.");
+ return response;
}
RequestId ResponseHandler::getRequestId() {
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/ResponseHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/ResponseHandler.h?view=diff&rev=510161&r1=510160&r2=510161
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/ResponseHandler.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/ResponseHandler.h Wed Feb 21 11:25:45 2007
@@ -42,7 +42,7 @@
~ResponseHandler();
bool isWaiting(){ return waiting; }
- framing::AMQMethodBody::shared_ptr getResponse(){ return response;}
+ framing::AMQMethodBody::shared_ptr getResponse();
void waitForResponse();
void signalResponse(framing::AMQMethodBody::shared_ptr response);
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/Exception.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/Exception.cpp?view=diff&rev=510161&r1=510160&r2=510161
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/Exception.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/Exception.cpp Wed Feb 21 11:25:45 2007
@@ -39,4 +39,8 @@
void Exception::throwSelf() const { throw *this; }
+ShutdownException::ShutdownException() : Exception("Shut down.") {}
+
+EmptyException::EmptyException() : Exception("Empty.") {}
+
} // namespace qpid
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/Exception.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/Exception.h?view=diff&rev=510161&r1=510160&r2=510161
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/Exception.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/Exception.h Wed Feb 21 11:25:45 2007
@@ -66,6 +66,7 @@
template <class T>
ChannelException(framing::ReplyCode code_, const T& message)
: Exception(message), code(code_) {}
+ void throwSelf() const { throw *this; }
};
struct ConnectionException : public Exception {
@@ -73,8 +74,23 @@
template <class T>
ConnectionException(framing::ReplyCode code_, const T& message)
: Exception(message), code(code_) {}
+ void throwSelf() const { throw *this; }
};
+/**
+ * Exception used to indicate that a thread should shut down.
+ * Does not indicate an error that should be signalled to the user.
+ */
+struct ShutdownException : public Exception {
+ ShutdownException();
+ void throwSelf() const { throw *this; }
+};
+
+/** Exception to indicate empty queue or other empty state */
+struct EmptyException : public Exception {
+ EmptyException();
+ void throwSelf() const { throw *this; }
+};
}
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/ExceptionHolder.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/ExceptionHolder.h?view=diff&rev=510161&r1=510160&r2=510161
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/ExceptionHolder.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/ExceptionHolder.h Wed Feb 21 11:25:45 2007
@@ -19,11 +19,15 @@
*
*/
+#include <assert.h>
#include <Exception.h>
#include <boost/shared_ptr.hpp>
namespace qpid {
+// FIXME aconway 2007-02-20: Not necessary, a simple
+// Exception::shared_ptr will do the job. Remove
+//
/**
* Holder for a heap-allocated exc eption that can be stack allocated
* and thrown safely.
@@ -49,11 +53,11 @@
~ExceptionHolder() throw() {}
- const char* what() const throw() { return (*this)->what(); }
- std::string toString() const throw() { return (*this)->toString(); }
- virtual Exception* clone() const throw() { return (*this)->clone(); }
- virtual void throwSelf() const { (*this)->throwSelf(); }
- virtual void throwIf() const { if (*this) (*this)->throwSelf(); }
+ const char* what() const throw() { return get()->what(); }
+ std::string toString() const throw() { return get()->toString(); }
+ Exception* clone() const throw() { return get()->clone(); }
+ void throwIf() const { if (get()) get()->throwSelf(); }
+ void throwSelf() const { assert(get()); get()->throwSelf(); }
};
} // namespace qpid
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.h?view=diff&rev=510161&r1=510160&r2=510161
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ChannelAdapter.h Wed Feb 21 11:25:45 2007
@@ -34,6 +34,8 @@
class MethodContext;
+// FIXME aconway 2007-02-20: Rename as ChannelBase or just Channel.
+
/**
* Base class for client and broker channels.
*
Added: incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/Condition.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/Condition.h?view=auto&rev=510161
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/Condition.h (added)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/Condition.h Wed Feb 21 11:25:45 2007
@@ -0,0 +1,128 @@
+#ifndef _sys_Condition_h
+#define _sys_Condition_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <sys/errno.h>
+#include <boost/noncopyable.hpp>
+#include <sys/Mutex.h>
+#include <sys/Time.h>
+
+#ifdef USE_APR
+# include <apr_thread_cond.h>
+#endif
+
+namespace qpid {
+namespace sys {
+
+/**
+ * A condition variable for thread synchronization.
+ */
+class Condition
+{
+ public:
+ inline Condition();
+ inline ~Condition();
+ inline void wait(Mutex&);
+ inline bool wait(Mutex&, const Time& absoluteTime);
+ inline void notify();
+ inline void notifyAll();
+
+ private:
+#ifdef USE_APR
+ apr_thread_cond_t* condition;
+#else
+ pthread_cond_t condition;
+#endif
+};
+
+
+// APR ================================================================
+#ifdef USE_APR
+
+Condition::Condition() {
+ CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, APRPool::get()));
+}
+
+Condition::~Condition() {
+ CHECK_APR_SUCCESS(apr_thread_cond_destroy(condition));
+}
+
+void Condition::wait(Mutex& mutex) {
+ CHECK_APR_SUCCESS(apr_thread_cond_wait(condition, mutex.mutex));
+}
+
+bool Condition::wait(Mutex& mutex, const Time& absoluteTime){
+ // APR uses microseconds.
+ apr_status_t status =
+ apr_thread_cond_timedwait(
+ condition, mutex.mutex, absoluteTime/TIME_USEC);
+ if(status != APR_TIMEUP) CHECK_APR_SUCCESS(status);
+ return status == 0;
+}
+
+void Condition::notify(){
+ CHECK_APR_SUCCESS(apr_thread_cond_signal(condition));
+}
+
+void Condition::notifyAll(){
+ CHECK_APR_SUCCESS(apr_thread_cond_broadcast(condition));
+}
+
+#else
+// POSIX ================================================================
+
+Condition::Condition() {
+ QPID_POSIX_THROW_IF(pthread_cond_init(&condition, 0));
+}
+
+Condition::~Condition() {
+ QPID_POSIX_THROW_IF(pthread_cond_destroy(&condition));
+}
+
+void Condition::wait(Mutex& mutex) {
+ QPID_POSIX_THROW_IF(pthread_cond_wait(&condition, &mutex.mutex));
+}
+
+bool Condition::wait(Mutex& mutex, const Time& absoluteTime){
+ struct timespec ts;
+ toTimespec(ts, absoluteTime);
+ int status = pthread_cond_timedwait(&condition, &mutex.mutex, &ts);
+ if (status != 0) {
+ if (status == ETIMEDOUT) return false;
+ throw QPID_POSIX_ERROR(status);
+ }
+ return true;
+}
+
+void Condition::notify(){
+ QPID_POSIX_THROW_IF(pthread_cond_signal(&condition));
+}
+
+void Condition::notifyAll(){
+ QPID_POSIX_THROW_IF(pthread_cond_broadcast(&condition));
+}
+#endif /*USE_APR*/
+
+
+}}
+#endif /*!_sys_Condition_h*/
Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/Condition.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/Condition.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/Monitor.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/Monitor.h?view=diff&rev=510161&r1=510160&r2=510161
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/Monitor.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/Monitor.h Wed Feb 21 11:25:45 2007
@@ -23,9 +23,7 @@
*/
#include <sys/errno.h>
-#include <boost/noncopyable.hpp>
-#include <sys/Mutex.h>
-#include <sys/Time.h>
+#include <sys/Condition.h>
#ifdef USE_APR
# include <apr_thread_cond.h>
@@ -37,91 +35,21 @@
/**
* A monitor is a condition variable and a mutex
*/
-class Monitor : public Mutex
-{
+class Monitor : public Mutex, public Condition {
public:
- inline Monitor();
- inline ~Monitor();
+ using Condition::wait;
inline void wait();
inline bool wait(const Time& absoluteTime);
- inline void notify();
- inline void notifyAll();
-
- private:
-#ifdef USE_APR
- apr_thread_cond_t* condition;
-#else
- pthread_cond_t condition;
-#endif
};
-// APR ================================================================
-#ifdef USE_APR
-
-Monitor::Monitor() {
- CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, APRPool::get()));
-}
-
-Monitor::~Monitor() {
- CHECK_APR_SUCCESS(apr_thread_cond_destroy(condition));
-}
-
-void Monitor::wait() {
- CHECK_APR_SUCCESS(apr_thread_cond_wait(condition, mutex));
-}
-
-bool Monitor::wait(const Time& absoluteTime){
- // APR uses microseconds.
- apr_status_t status =
- apr_thread_cond_timedwait(condition, mutex, absoluteTime/TIME_USEC);
- if(status != APR_TIMEUP) CHECK_APR_SUCCESS(status);
- return status == 0;
-}
-
-void Monitor::notify(){
- CHECK_APR_SUCCESS(apr_thread_cond_signal(condition));
-}
-
-void Monitor::notifyAll(){
- CHECK_APR_SUCCESS(apr_thread_cond_broadcast(condition));
-}
-
-#else
-// POSIX ================================================================
-
-Monitor::Monitor() {
- QPID_POSIX_THROW_IF(pthread_cond_init(&condition, 0));
-}
-
-Monitor::~Monitor() {
- QPID_POSIX_THROW_IF(pthread_cond_destroy(&condition));
-}
-
void Monitor::wait() {
- QPID_POSIX_THROW_IF(pthread_cond_wait(&condition, &mutex));
-}
-
-bool Monitor::wait(const Time& absoluteTime){
- struct timespec ts;
- toTimespec(ts, absoluteTime);
- int status = pthread_cond_timedwait(&condition, &mutex, &ts);
- if (status != 0) {
- if (status == ETIMEDOUT) return false;
- throw QPID_POSIX_ERROR(status);
- }
- return true;
+ Condition::wait(*this);
}
-void Monitor::notify(){
- QPID_POSIX_THROW_IF(pthread_cond_signal(&condition));
+bool Monitor::wait(const Time& absoluteTime) {
+ return Condition::wait(*this, absoluteTime);
}
-
-void Monitor::notifyAll(){
- QPID_POSIX_THROW_IF(pthread_cond_broadcast(&condition));
-}
-#endif /*USE_APR*/
-
}}
#endif /*!_sys_Monitor_h*/
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/Mutex.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/Mutex.h?view=diff&rev=510161&r1=510160&r2=510161
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/Mutex.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/sys/Mutex.h Wed Feb 21 11:25:45 2007
@@ -32,6 +32,8 @@
namespace qpid {
namespace sys {
+class Condition;
+
/**
* Scoped lock template: calls lock() in ctor, unlock() in dtor.
* L can be any class with lock() and unlock() functions.
@@ -76,6 +78,7 @@
#else
pthread_mutex_t mutex;
#endif
+ friend class Condition;
};
#ifdef USE_APR
Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/ClientChannelTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/ClientChannelTest.cpp?view=diff&rev=510161&r1=510160&r2=510161
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/ClientChannelTest.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/ClientChannelTest.cpp Wed Feb 21 11:25:45 2007
@@ -18,6 +18,7 @@
* under the License.
*
*/
+#include <vector>
#include "qpid_test_plugin.h"
#include "InProcessBroker.h"
#include "ClientChannel.h"
@@ -28,6 +29,7 @@
using namespace std;
using namespace boost;
using namespace qpid::client;
+using namespace qpid::sys;
using namespace qpid::framing;
/**
@@ -36,45 +38,125 @@
class ClientChannelTest : public CppUnit::TestCase
{
CPPUNIT_TEST_SUITE(ClientChannelTest);
- CPPUNIT_TEST(testGet);
- CPPUNIT_TEST(testConsume);
+ CPPUNIT_TEST(testPublishGet);
+ CPPUNIT_TEST(testGetNoContent);
+ CPPUNIT_TEST(testConsumeCancel);
+ CPPUNIT_TEST(testConsumePublished);
CPPUNIT_TEST_SUITE_END();
+ struct Listener: public qpid::client::MessageListener {
+ vector<Message> messages;
+ Monitor monitor;
+ void received(Message& msg) {
+ Mutex::ScopedLock l(monitor);
+ messages.push_back(msg);
+ monitor.notifyAll();
+ }
+ };
+
InProcessBrokerClient connection; // client::connection + local broker
Channel channel;
- const std::string key;
+ const std::string qname;
const std::string data;
Queue queue;
Exchange exchange;
+ Listener listener;
public:
ClientChannelTest()
- : key("testq"), data("hello"),
- queue(key, true), exchange("", Exchange::DIRECT_EXCHANGE)
+ : qname("testq"), data("hello"),
+ queue(qname, true), exchange("", Exchange::DIRECT_EXCHANGE)
{
connection.openChannel(channel);
CPPUNIT_ASSERT(channel.getId() != 0);
channel.declareQueue(queue);
}
- void testGet() {
- // FIXME aconway 2007-02-16: Must fix thread safety bug
- // in ClientChannel::get for this to pass.
- return;
-
+ void testPublishGet() {
Message pubMsg(data);
- channel.publish(pubMsg, exchange, key);
+ pubMsg.getHeaders().setString("hello", "world");
+ channel.publish(pubMsg, exchange, qname);
Message getMsg;
- channel.get(getMsg, queue);
+ CPPUNIT_ASSERT(channel.get(getMsg, queue));
CPPUNIT_ASSERT_EQUAL(data, getMsg.getData());
+ CPPUNIT_ASSERT_EQUAL(string("world"),
+ getMsg.getHeaders().getString("hello"));
+ CPPUNIT_ASSERT(!channel.get(getMsg, queue)); // Empty queue
}
- void testConsume() {
+ void testGetNoContent() {
+ Message pubMsg;
+ pubMsg.getHeaders().setString("hello", "world");
+ channel.publish(pubMsg, exchange, qname);
+ Message getMsg;
+ CPPUNIT_ASSERT(channel.get(getMsg, queue));
+ CPPUNIT_ASSERT(getMsg.getData().empty());
+ CPPUNIT_ASSERT_EQUAL(string("world"),
+ getMsg.getHeaders().getString("hello"));
}
+
+ void testConsumeCancel() {
+ string tag; // Broker assigned
+ channel.consume(queue, tag, &listener);
+ channel.start();
+ CPPUNIT_ASSERT_EQUAL(size_t(0), listener.messages.size());
+ channel.publish(Message("a"), exchange, qname);
+ {
+ Mutex::ScopedLock l(listener.monitor);
+ Time deadline(now() + 1*TIME_SEC);
+ while (listener.messages.size() != 1) {
+ CPPUNIT_ASSERT(listener.monitor.wait(deadline));
+ }
+ }
+ CPPUNIT_ASSERT_EQUAL(size_t(1), listener.messages.size());
+ CPPUNIT_ASSERT_EQUAL(string("a"), listener.messages[0].getData());
+
+ channel.publish(Message("b"), exchange, qname);
+ channel.publish(Message("c"), exchange, qname);
+ {
+ Mutex::ScopedLock l(listener.monitor);
+ while (listener.messages.size() != 3) {
+ CPPUNIT_ASSERT(listener.monitor.wait(1*TIME_SEC));
+ }
+ }
+ CPPUNIT_ASSERT_EQUAL(size_t(3), listener.messages.size());
+ CPPUNIT_ASSERT_EQUAL(string("b"), listener.messages[1].getData());
+ CPPUNIT_ASSERT_EQUAL(string("c"), listener.messages[2].getData());
+ channel.cancel(tag);
+ channel.publish(Message("d"), exchange, qname);
+ CPPUNIT_ASSERT_EQUAL(size_t(3), listener.messages.size());
+ {
+ Mutex::ScopedLock l(listener.monitor);
+ CPPUNIT_ASSERT(!listener.monitor.wait(TIME_SEC/2));
+ }
+ Message msg;
+ CPPUNIT_ASSERT(channel.get(msg, queue));
+ CPPUNIT_ASSERT_EQUAL(string("d"), msg.getData());
+ }
+
+ // Consume already-published messages
+ void testConsumePublished() {
+ Message pubMsg("x");
+ pubMsg.getHeaders().setString("y", "z");
+ channel.publish(pubMsg, exchange, qname);
+ string tag;
+ channel.consume(queue, tag, &listener);
+ CPPUNIT_ASSERT_EQUAL(size_t(0), listener.messages.size());
+ channel.start();
+ {
+ Mutex::ScopedLock l(listener.monitor);
+ while (listener.messages.size() != 1)
+ CPPUNIT_ASSERT(listener.monitor.wait(1*TIME_SEC));
+ }
+ CPPUNIT_ASSERT_EQUAL(string("x"), listener.messages[0].getData());
+ CPPUNIT_ASSERT_EQUAL(string("z"),
+ listener.messages[0].getHeaders().getString("y"));
+ }
- // FIXME aconway 2007-02-15: Cover full channel API
+
+
};
// Make this test suite a plugin.