You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/09/11 13:25:28 UTC
svn commit: r574551 - in /incubator/qpid/trunk/qpid/cpp: rubygen/templates/
src/qpid/client/ src/qpid/framing/ src/tests/
Author: gsim
Date: Tue Sep 11 04:25:27 2007
New Revision: 574551
URL: http://svn.apache.org/viewvc?rev=574551&view=rev
Log:
Moved old ClientChannel class from using basic to using message for publish & consume.
(Get and qos still use the basic class's defintions, that will be changed next)
Modified:
incubator/qpid/trunk/qpid/cpp/rubygen/templates/structs.rb
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientMessage.h
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.h
incubator/qpid/trunk/qpid/cpp/src/tests/interop_runner.cpp
Modified: incubator/qpid/trunk/qpid/cpp/rubygen/templates/structs.rb
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/rubygen/templates/structs.rb?rev=574551&r1=574550&r2=574551&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/rubygen/templates/structs.rb (original)
+++ incubator/qpid/trunk/qpid/cpp/rubygen/templates/structs.rb Tue Sep 11 04:25:27 2007
@@ -152,6 +152,9 @@
def define_accessors(f)
genl "void set#{f.name.caps}(#{f.cpptype.param} _#{f.cppname}) { #{f.cppname} = _#{f.cppname}; }"
genl "#{f.cpptype.ret} get#{f.name.caps}() const { return #{f.cppname}; }"
+ if (f.cpptype.name == "FieldTable")
+ genl "#{f.cpptype.name}& get#{f.name.caps}() { return #{f.cppname}; }"
+ end
end
def define_struct(s)
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp?rev=574551&r1=574550&r2=574551&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp Tue Sep 11 04:25:27 2007
@@ -136,7 +136,7 @@
}
void Channel::consume(
- Queue& queue, const std::string& tag, MessageListener* listener,
+ Queue& _queue, const std::string& tag, MessageListener* listener,
AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields) {
if (tag.empty()) {
@@ -152,10 +152,11 @@
c.ackMode = ackMode;
c.lastDeliveryTag = 0;
}
+ uint8_t confirmMode = ackMode == NO_ACK ? 0 : 1;
ScopedSync s(session, synch);
- session.basicConsume(0, queue.getName(), tag, noLocal,
- ackMode == NO_ACK, false, !synch,
- fields ? *fields : FieldTable());
+ session.messageSubscribe(0, _queue.getName(), tag, noLocal,
+ confirmMode, 0/*pre-acquire*/,
+ false, fields ? *fields : FieldTable());
}
void Channel::cancel(const std::string& tag, bool synch) {
@@ -169,7 +170,7 @@
consumers.erase(i);
}
ScopedSync s(session, synch);
- session.basicCancel(tag);
+ session.messageCancel(tag);
}
bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) {
@@ -184,14 +185,13 @@
}
}
-void Channel::publish(const Message& msg, const Exchange& exchange,
+void Channel::publish(Message& msg, const Exchange& exchange,
const std::string& routingKey,
- bool mandatory, bool immediate) {
+ bool mandatory, bool /*immediate TODO-restore immediate?*/) {
- const string e = exchange.getName();
- string key = routingKey;
-
- session.basicPublish(0, e, key, mandatory, immediate, msg);
+ msg.getDeliveryProperties().setRoutingKey(routingKey);
+ msg.getDeliveryProperties().setDiscardUnroutable(!mandatory);
+ session.messageTransfer((destination=exchange.getName(), content=msg));
}
void Channel::close()
@@ -222,20 +222,27 @@
}
}
+void Channel::dispatch(FrameSet& content, const std::string& destination)
+{
+ ConsumerMap::iterator i = consumers.find(destination);
+ if (i != consumers.end()) {
+ Message msg;
+ msg.populate(content);
+ i->second.listener->received(msg);
+ } else {
+ QPID_LOG(warning, "Dropping message for unrecognised consumer: " << destination);
+ }
+}
+
void Channel::run() {
try {
while (true) {
FrameSet::shared_ptr content = session.get();
//need to dispatch this to the relevant listener:
if (content->isA<BasicDeliverBody>()) {
- ConsumerMap::iterator i = consumers.find(content->as<BasicDeliverBody>()->getConsumerTag());
- if (i != consumers.end()) {
- Message msg;
- msg.populate(*content);
- i->second.listener->received(msg);
- } else {
- QPID_LOG(warning, "Dropping message for unrecognised consumer: " << content->getMethod());
- }
+ dispatch(*content, content->as<BasicDeliverBody>()->getConsumerTag());
+ } else if (content->isA<MessageTransferBody>()) {
+ dispatch(*content, content->as<MessageTransferBody>()->getDestination());
} else if (content->isA<BasicGetOkBody>()) {
gets.push(content);
} else {
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h?rev=574551&r1=574550&r2=574551&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h Tue Sep 11 04:25:27 2007
@@ -93,6 +93,8 @@
void closeInternal();
void join();
+ void dispatch(framing::FrameSet& msg, const std::string& destination);
+
// FIXME aconway 2007-02-23: Get rid of friendships.
friend class Connection;
@@ -301,7 +303,7 @@
* receive this message on publication, the message will be
* returned (see setReturnedMessageHandler()).
*/
- void publish(const Message& msg, const Exchange& exchange,
+ void publish(Message& msg, const Exchange& exchange,
const std::string& routingKey,
bool mandatory = false, bool immediate = false);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientMessage.h?rev=574551&r1=574550&r2=574551&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientMessage.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientMessage.h Tue Sep 11 04:25:27 2007
@@ -22,13 +22,7 @@
*
*/
#include <string>
-#include "qpid/framing/BasicHeaderProperties.h"
-#include "qpid/framing/FrameSet.h"
-#include "qpid/framing/MethodContent.h"
-
-#include "qpid/framing/BasicDeliverBody.h"
-#include "qpid/framing/BasicGetOkBody.h"
-#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/TransferContent.h"
namespace qpid {
namespace client {
@@ -39,49 +33,37 @@
*
* \ingroup clientapi
*/
-// FIXME aconway 2007-04-05: Should be based on MessageTransfer properties not
-// basic header properties.
-class Message : public framing::BasicHeaderProperties, public framing::MethodContent {
- public:
- Message(const std::string& data_=std::string()) : data(data_) {}
-
- const std::string& getData() const { return data; }
- void setData(const std::string& _data) { data = _data; }
-
- std::string getDestination() const { return destination; }
- void setDestination(const std::string& dest) { destination = dest; }
-
- // TODO aconway 2007-03-22: only needed for Basic.deliver support.
- uint64_t getDeliveryTag() const { return deliveryTag; }
- void setDeliveryTag(uint64_t dt) { deliveryTag = dt; }
+class Message : public framing::TransferContent
+{
+public:
+ Message(const std::string& data_=std::string()) : TransferContent(data_) {}
- bool isRedelivered() const { return redelivered; }
- void setRedelivered(bool _redelivered){ redelivered = _redelivered; }
+ std::string getDestination() const
+ {
+ return destination;
+ }
+
+ void setDestination(const std::string& dest)
+ {
+ destination = dest;
+ }
- framing::AMQHeaderBody getHeader() const
+ bool isRedelivered() const
{
- framing::AMQHeaderBody header;
- BasicHeaderProperties* properties = header.get<BasicHeaderProperties>(true);
- BasicHeaderProperties::copy<BasicHeaderProperties, Message>(*properties, *this);
- properties->setContentLength(data.size());
- return header;
+ return hasDeliveryProperties() && getDeliveryProperties().getRedelivered();
}
- //TODO: move this elsewhere (GRS 24/08/2007)
- void populate(framing::FrameSet& frameset)
- {
- const BasicHeaderProperties* properties = frameset.getHeaders()->get<BasicHeaderProperties>();
- if (properties) {
- BasicHeaderProperties::copy<Message, BasicHeaderProperties>(*this, *properties);
- }
- frameset.getContent(data);
+ void setRedelivered(bool redelivered) {
+ getDeliveryProperties().setRedelivered(redelivered);
+ }
+
+ framing::FieldTable& getHeaders()
+ {
+ return getMessageProperties().getApplicationHeaders();
}
- private:
- std::string data;
+private:
std::string destination;
- bool redelivered;
- uint64_t deliveryTag;
};
}}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp?rev=574551&r1=574550&r2=574551&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp Tue Sep 11 04:25:27 2007
@@ -61,4 +61,35 @@
return *header.get<DeliveryProperties>(true);
}
+void TransferContent::populate(const FrameSet& frameset)
+{
+ header = *frameset.getHeaders();
+ frameset.getContent(data);
+}
+
+const MessageProperties& TransferContent::getMessageProperties() const
+{
+ const MessageProperties* props = header.get<MessageProperties>();
+ if (!props) throw NoSuchPropertiesException();
+ return *props;
+}
+
+const DeliveryProperties& TransferContent::getDeliveryProperties() const
+{
+ const DeliveryProperties* props = header.get<DeliveryProperties>();
+ if (!props) throw NoSuchPropertiesException();
+ return *props;
+}
+
+bool TransferContent::hasMessageProperties() const
+{
+ return header.get<MessageProperties>();
+}
+
+bool TransferContent::hasDeliveryProperties() const
+{
+ return header.get<DeliveryProperties>();
+}
+
+
}}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.h?rev=574551&r1=574550&r2=574551&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.h Tue Sep 11 04:25:27 2007
@@ -21,13 +21,17 @@
#ifndef _TransferContent_
#define _TransferContent_
+#include "FrameSet.h"
#include "MethodContent.h"
+#include "qpid/Exception.h"
#include "qpid/framing/MessageProperties.h"
#include "qpid/framing/DeliveryProperties.h"
namespace qpid {
namespace framing {
+struct NoSuchPropertiesException : public Exception {};
+
class TransferContent : public MethodContent
{
AMQHeaderBody header;
@@ -37,9 +41,16 @@
AMQHeaderBody getHeader() const;
void setData(const std::string&);
void appendData(const std::string&);
- const std::string& getData() const;
MessageProperties& getMessageProperties();
DeliveryProperties& getDeliveryProperties();
+
+ const std::string& getData() const;
+ const MessageProperties& getMessageProperties() const;
+ const DeliveryProperties& getDeliveryProperties() const;
+ bool hasMessageProperties() const;
+ bool hasDeliveryProperties() const;
+
+ void populate(const FrameSet& frameset);
};
}}
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/interop_runner.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/interop_runner.cpp?rev=574551&r1=574550&r2=574551&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/interop_runner.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/interop_runner.cpp Tue Sep 11 04:25:27 2007
@@ -47,6 +47,7 @@
using qpid::TestCase;
using qpid::TestOptions;
using qpid::framing::FieldTable;
+using qpid::framing::ReplyTo;
using namespace std;
class DummyRun : public TestCase
@@ -73,14 +74,14 @@
const string topic;
TestMap::iterator test;
auto_ptr<Thread> runner;
- string reportTo;
+ ReplyTo reportTo;
string reportCorrelator;
void shutdown();
bool invite(const string& name);
void run();
- void sendResponse(Message& response, string replyTo);
+ void sendResponse(Message& response, ReplyTo replyTo);
void sendResponse(Message& response, Message& request);
void sendSimpleResponse(const string& type, Message& request);
void sendReport();
@@ -146,30 +147,19 @@
response.getHeaders().setString("CONTROL_TYPE", type);
response.getHeaders().setString("CLIENT_NAME", name);
response.getHeaders().setString("CLIENT_PRIVATE_CONTROL_KEY", topic);
- response.setCorrelationId(request.getCorrelationId());
+ response.getMessageProperties().setCorrelationId(request.getMessageProperties().getCorrelationId());
sendResponse(response, request);
}
void Listener::sendResponse(Message& response, Message& request)
{
- sendResponse(response, request.getReplyTo());
+ sendResponse(response, request.getMessageProperties().getReplyTo());
}
-void Listener::sendResponse(Message& response, string replyTo)
+void Listener::sendResponse(Message& response, ReplyTo replyTo)
{
- //Exchange and routing key need to be extracted from the reply-to
- //field. Format is assumed to be:
- //
- // <exchange type>://<exchange name>/<routing key>?<options>
- //
- //and all we need is the exchange name and routing key
- //
- if (replyTo.empty()) throw qpid::Exception("Reply address not set!");
- const string delims(":/?=");
-
- string::size_type start = replyTo.find(':');//skip exchange type
- string exchange = parse_next_word(replyTo, delims, start);
- string routingKey = parse_next_word(replyTo, delims, start);
+ string exchange = replyTo.getExchangeName();
+ string routingKey = replyTo.getRoutingKey();
channel.publish(response, exchange, routingKey);
}
@@ -188,12 +178,12 @@
test->assign(message.getHeaders().getString("ROLE"), message.getHeaders(), options);
sendSimpleResponse("ACCEPT_ROLE", message);
} else if (type == "START") {
- reportTo = message.getReplyTo();
- reportCorrelator = message.getCorrelationId();
+ reportTo = message.getMessageProperties().getReplyTo();
+ reportCorrelator = message.getMessageProperties().getCorrelationId();
runner = auto_ptr<Thread>(new Thread(this));
} else if (type == "STATUS_REQUEST") {
- reportTo = message.getReplyTo();
- reportCorrelator = message.getCorrelationId();
+ reportTo = message.getMessageProperties().getReplyTo();
+ reportCorrelator = message.getMessageProperties().getCorrelationId();
test->stop();
sendReport();
} else if (type == "TERMINATE") {
@@ -229,7 +219,7 @@
Message report;
report.getHeaders().setString("CONTROL_TYPE", "REPORT");
test->report(report);
- report.setCorrelationId(reportCorrelator);
+ report.getMessageProperties().setCorrelationId(reportCorrelator);
sendResponse(report, reportTo);
}