You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/03/21 20:12:15 UTC
svn commit: r520972 - in /incubator/qpid/trunk/qpid/cpp: lib/client/
lib/common/ lib/common/framing/ tests/
Author: aconway
Date: Wed Mar 21 12:12:14 2007
New Revision: 520972
URL: http://svn.apache.org/viewvc?view=rev&rev=520972
Log:
Refactored client side for dual-mode Channel supporting either 0-9 Message or 0-8 Basic.
Added:
incubator/qpid/trunk/qpid/cpp/lib/client/AckMode.h (with props)
incubator/qpid/trunk/qpid/cpp/lib/client/BasicMessageChannel.cpp (with props)
incubator/qpid/trunk/qpid/cpp/lib/client/BasicMessageChannel.h (with props)
incubator/qpid/trunk/qpid/cpp/lib/client/MessageChannel.h (with props)
incubator/qpid/trunk/qpid/cpp/lib/common/shared_ptr.h (with props)
Modified:
incubator/qpid/trunk/qpid/cpp/lib/client/ClientChannel.cpp
incubator/qpid/trunk/qpid/cpp/lib/client/ClientChannel.h
incubator/qpid/trunk/qpid/cpp/lib/client/ClientMessage.cpp
incubator/qpid/trunk/qpid/cpp/lib/client/IncomingMessage.h
incubator/qpid/trunk/qpid/cpp/lib/client/Makefile.am
incubator/qpid/trunk/qpid/cpp/lib/common/Makefile.am
incubator/qpid/trunk/qpid/cpp/lib/common/framing/FramingContent.h
incubator/qpid/trunk/qpid/cpp/tests/ClientChannelTest.cpp
incubator/qpid/trunk/qpid/cpp/tests/InProcessBroker.h
incubator/qpid/trunk/qpid/cpp/tests/ProducerConsumerTest.cpp
incubator/qpid/trunk/qpid/cpp/tests/client_test.cpp
incubator/qpid/trunk/qpid/cpp/tests/echo_service.cpp
incubator/qpid/trunk/qpid/cpp/tests/topic_listener.cpp
incubator/qpid/trunk/qpid/cpp/tests/topic_publisher.cpp
Added: incubator/qpid/trunk/qpid/cpp/lib/client/AckMode.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/client/AckMode.h?view=auto&rev=520972
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/client/AckMode.h (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/client/AckMode.h Wed Mar 21 12:12:14 2007
@@ -0,0 +1,102 @@
+#ifndef _client_AckMode_h
+#define _client_AckMode_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+namespace qpid {
+namespace client {
+
+/**
+ * The available acknowledgements modes.
+ *
+ * \ingroup clientapi
+ */
+enum AckMode {
+ /** No acknowledgement will be sent, broker can
+ discard messages as soon as they are delivered
+ to a consumer using this mode. **/
+ NO_ACK = 0,
+ /** Each message will be automatically
+ acknowledged as soon as it is delivered to the
+ application **/
+ AUTO_ACK = 1,
+ /** Acknowledgements will be sent automatically,
+ but not for each message. **/
+ LAZY_ACK = 2,
+ /** The application is responsible for explicitly
+ acknowledging messages. **/
+ CLIENT_ACK = 3
+};
+
+}} // namespace qpid::client
+
+
+
+#endif /*!_client_AckMode_h*/
+#ifndef _client_AckMode_h
+#define _client_AckMode_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+namespace qpid {
+namespace client {
+
+/**
+ * The available acknowledgements modes.
+ *
+ * \ingroup clientapi
+ */
+enum AckMode {
+ /** No acknowledgement will be sent, broker can
+ discard messages as soon as they are delivered
+ to a consumer using this mode. **/
+ NO_ACK = 0,
+ /** Each message will be automatically
+ acknowledged as soon as it is delivered to the
+ application **/
+ AUTO_ACK = 1,
+ /** Acknowledgements will be sent automatically,
+ but not for each message. **/
+ LAZY_ACK = 2,
+ /** The application is responsible for explicitly
+ acknowledging messages. **/
+ CLIENT_ACK = 3
+};
+
+}} // namespace qpid::client
+
+
+
+#endif /*!_client_AckMode_h*/
Propchange: incubator/qpid/trunk/qpid/cpp/lib/client/AckMode.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/lib/client/AckMode.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/lib/client/BasicMessageChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/client/BasicMessageChannel.cpp?view=auto&rev=520972
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/client/BasicMessageChannel.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/client/BasicMessageChannel.cpp Wed Mar 21 12:12:14 2007
@@ -0,0 +1,261 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <iostream>
+#include "BasicMessageChannel.h"
+#include "AMQMethodBody.h"
+#include "ClientChannel.h"
+#include "ReturnedMessageHandler.h"
+#include "MessageListener.h"
+#include "framing/FieldTable.h"
+#include "Connection.h"
+
+using namespace std;
+
+namespace qpid {
+namespace client {
+
+using namespace sys;
+using namespace framing;
+
+BasicMessageChannel::BasicMessageChannel(Channel& ch)
+ : channel(ch), returnsHandler(0) {}
+
+void BasicMessageChannel::consume(
+ Queue& queue, std::string& tag, MessageListener* listener,
+ AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields)
+{
+ channel.sendAndReceiveSync<BasicConsumeOkBody>(
+ synch,
+ new BasicConsumeBody(
+ channel.version, 0, queue.getName(), tag, noLocal,
+ ackMode == NO_ACK, false, !synch,
+ fields ? *fields : FieldTable()));
+ if (synch) {
+ BasicConsumeOkBody::shared_ptr response =
+ boost::shared_polymorphic_downcast<BasicConsumeOkBody>(
+ channel.responses.getResponse());
+ tag = response->getConsumerTag();
+ }
+ // FIXME aconway 2007-02-20: Race condition!
+ // We could receive the first message for the consumer
+ // before we create the consumer below.
+ // Move consumer creation to handler for BasicConsumeOkBody
+ {
+ Mutex::ScopedLock l(lock);
+ ConsumerMap::iterator i = consumers.find(tag);
+ if (i != consumers.end())
+ THROW_QPID_ERROR(CLIENT_ERROR,
+ "Consumer already exists with tag="+tag);
+ Consumer& c = consumers[tag];
+ c.listener = listener;
+ c.ackMode = ackMode;
+ c.lastDeliveryTag = 0;
+ }
+}
+
+
+void BasicMessageChannel::cancel(const std::string& tag, bool synch) {
+ Consumer c;
+ {
+ Mutex::ScopedLock l(lock);
+ ConsumerMap::iterator i = consumers.find(tag);
+ if (i == consumers.end())
+ return;
+ c = i->second;
+ consumers.erase(i);
+ }
+ if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0)
+ channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true));
+ channel.sendAndReceiveSync<BasicCancelOkBody>(
+ synch, new BasicCancelBody(channel.version, tag, !synch));
+}
+
+void BasicMessageChannel::close(){
+ ConsumerMap consumersCopy;
+ {
+ Mutex::ScopedLock l(lock);
+ consumersCopy = consumers;
+ consumers.clear();
+ }
+ for (ConsumerMap::iterator i=consumersCopy.begin();
+ i != consumersCopy.end(); ++i)
+ {
+ Consumer& c = i->second;
+ if ((c.ackMode == LAZY_ACK || c.ackMode == AUTO_ACK)
+ && c.lastDeliveryTag > 0)
+ {
+ channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true));
+ }
+ }
+ incoming.shutdown();
+}
+
+
+
+bool BasicMessageChannel::get(Message& msg, const Queue& queue, AckMode ackMode) {
+ // Expect a message starting with a BasicGetOk
+ incoming.startGet();
+ channel.send(new BasicGetBody(channel.version, 0, queue.getName(), ackMode));
+ return incoming.waitGet(msg);
+}
+
+void BasicMessageChannel::publish(
+ const Message& msg, const Exchange& exchange,
+ const std::string& routingKey, bool mandatory, bool immediate)
+{
+ msg.getHeader()->setContentSize(msg.getData().size());
+ const string e = exchange.getName();
+ string key = routingKey;
+ channel.send(new BasicPublishBody(channel.version, 0, e, key, mandatory, immediate));
+ //break msg up into header frame and content frame(s) and send these
+ channel.send(msg.getHeader());
+ string data = msg.getData();
+ u_int64_t data_length = data.length();
+ if(data_length > 0){
+ //frame itself uses 8 bytes
+ u_int32_t frag_size = channel.connection->getMaxFrameSize() - 8;
+ if(data_length < frag_size){
+ channel.send(new AMQContentBody(data));
+ }else{
+ u_int32_t offset = 0;
+ u_int32_t remaining = data_length - offset;
+ while (remaining > 0) {
+ u_int32_t length = remaining > frag_size ? frag_size : remaining;
+ string frag(data.substr(offset, length));
+ channel.send(new AMQContentBody(frag));
+
+ offset += length;
+ remaining = data_length - offset;
+ }
+ }
+ }
+}
+
+void BasicMessageChannel::handle(boost::shared_ptr<AMQMethodBody> method) {
+ assert(method->amqpClassId() ==BasicGetBody::CLASS_ID);
+ switch(method->amqpMethodId()) {
+ case BasicDeliverBody::METHOD_ID:
+ case BasicReturnBody::METHOD_ID:
+ case BasicGetOkBody::METHOD_ID:
+ case BasicGetEmptyBody::METHOD_ID:
+ incoming.add(method);
+ return;
+ }
+ throw Channel::UnknownMethod();
+}
+
+void BasicMessageChannel::handle(AMQHeaderBody::shared_ptr body){
+ incoming.add(body);
+}
+
+void BasicMessageChannel::handle(AMQContentBody::shared_ptr body){
+ incoming.add(body);
+}
+
+void BasicMessageChannel::deliver(Consumer& consumer, Message& msg){
+ //record delivery tag:
+ consumer.lastDeliveryTag = msg.getDeliveryTag();
+
+ //allow registered listener to handle the message
+ consumer.listener->received(msg);
+
+ if(channel.isOpen()){
+ bool multiple(false);
+ switch(consumer.ackMode){
+ case LAZY_ACK:
+ multiple = true;
+ if(++(consumer.count) < channel.getPrefetch())
+ break;
+ //else drop-through
+ case AUTO_ACK:
+ consumer.lastDeliveryTag = 0;
+ channel.send(
+ new BasicAckBody(
+ channel.version, msg.getDeliveryTag(), multiple));
+ case NO_ACK: // Nothing to do
+ case CLIENT_ACK: // User code must ack.
+ break;
+ // TODO aconway 2007-02-22: Provide a way for user
+ // to ack!
+ }
+ }
+
+ //as it stands, transactionality is entirely orthogonal to ack
+ //mode, though the acks will not be processed by the broker under
+ //a transaction until it commits.
+}
+
+
+void BasicMessageChannel::run() {
+ while(channel.isOpen()) {
+ try {
+ Message msg = incoming.waitDispatch();
+ if(msg.getMethod()->isA<BasicReturnBody>()) {
+ ReturnedMessageHandler* handler=0;
+ {
+ Mutex::ScopedLock l(lock);
+ handler=returnsHandler;
+ }
+ if(handler == 0) {
+ // TODO aconway 2007-02-20: proper logging.
+ cout << "Message returned: " << msg.getData() << endl;
+ }
+ else
+ handler->returned(msg);
+ }
+ else {
+ BasicDeliverBody::shared_ptr deliverBody =
+ boost::shared_polymorphic_downcast<BasicDeliverBody>(
+ msg.getMethod());
+ std::string tag = deliverBody->getConsumerTag();
+ Consumer consumer;
+ {
+ Mutex::ScopedLock l(lock);
+ ConsumerMap::iterator i = consumers.find(tag);
+ if(i == consumers.end())
+ THROW_QPID_ERROR(PROTOCOL_ERROR+504,
+ "Unknown consumer tag=" + tag);
+ consumer = i->second;
+ }
+ deliver(consumer, msg);
+ }
+ }
+ catch (const ShutdownException&) {
+ /* Orderly shutdown */
+ }
+ catch (const Exception& e) {
+ // FIXME aconway 2007-02-20: Report exception to user.
+ cout << "client::Basic::run() terminated by: " << e.toString()
+ << "(" << typeid(e).name() << ")" << endl;
+ }
+ }
+}
+
+void BasicMessageChannel::setReturnedMessageHandler(ReturnedMessageHandler* handler){
+ Mutex::ScopedLock l(lock);
+ returnsHandler = handler;
+}
+
+void BasicMessageChannel::setQos(){
+ channel.sendAndReceive<BasicQosOkBody>(
+ new BasicQosBody(channel.version, 0, channel.getPrefetch(), false));
+ if(channel.isTransactional())
+ channel.sendAndReceive<TxSelectOkBody>(new TxSelectBody(channel.version));
+}
+
+}} // namespace qpid::client
Propchange: incubator/qpid/trunk/qpid/cpp/lib/client/BasicMessageChannel.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/lib/client/BasicMessageChannel.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/lib/client/BasicMessageChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/client/BasicMessageChannel.h?view=auto&rev=520972
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/client/BasicMessageChannel.h (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/client/BasicMessageChannel.h Wed Mar 21 12:12:14 2007
@@ -0,0 +1,87 @@
+#ifndef _client_BasicMessageChannel_h
+#define _client_BasicMessageChannel_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "MessageChannel.h"
+#include "IncomingMessage.h"
+
+namespace qpid {
+namespace client {
+/**
+ * Messaging implementation using AMQP 0-8 BasicMessageChannel class
+ * to send and receiving messages.
+ */
+class BasicMessageChannel : public MessageChannel
+{
+ public:
+ BasicMessageChannel(Channel& parent);
+
+ void consume(
+ Queue& queue, std::string& tag, MessageListener* listener,
+ AckMode ackMode = NO_ACK, bool noLocal = false, bool synch = true,
+ const framing::FieldTable* fields = 0);
+
+ void cancel(const std::string& tag, bool synch = true);
+
+ bool get(Message& msg, const Queue& queue, AckMode ackMode = NO_ACK);
+
+ void publish(const Message& msg, const Exchange& exchange,
+ const std::string& routingKey,
+ bool mandatory = false, bool immediate = false);
+
+ void setReturnedMessageHandler(ReturnedMessageHandler* handler);
+
+ void run();
+
+ void handle(boost::shared_ptr<framing::AMQMethodBody>);
+
+ void handle(shared_ptr<framing::AMQHeaderBody>);
+
+ void handle(shared_ptr<framing::AMQContentBody>);
+
+ void setQos();
+
+ void close();
+
+ private:
+
+ struct Consumer{
+ MessageListener* listener;
+ AckMode ackMode;
+ int count;
+ u_int64_t lastDeliveryTag;
+ };
+
+ typedef std::map<std::string, Consumer> ConsumerMap;
+
+ void deliver(Consumer& consumer, Message& msg);
+
+ sys::Mutex lock;
+ Channel& channel;
+ IncomingMessage incoming;
+ ConsumerMap consumers;
+ ReturnedMessageHandler* returnsHandler;
+};
+
+}} // namespace qpid::client
+
+
+
+#endif /*!_client_BasicMessageChannel_h*/
Propchange: incubator/qpid/trunk/qpid/cpp/lib/client/BasicMessageChannel.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/lib/client/BasicMessageChannel.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/lib/client/ClientChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/client/ClientChannel.cpp?view=diff&rev=520972&r1=520971&r2=520972
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/client/ClientChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/client/ClientChannel.cpp Wed Mar 21 12:12:14 2007
@@ -25,6 +25,9 @@
#include <QpidError.h>
#include <MethodBodyInstances.h>
#include "Connection.h"
+#include "BasicMessageChannel.h"
+// FIXME aconway 2007-03-21:
+//#include "MessageMessageChannel.h"
// FIXME aconway 2007-01-26: Evaluate all throws, ensure consistent
// handling of errors that should close the connection or the channel.
@@ -36,8 +39,10 @@
using namespace qpid::framing;
using namespace qpid::sys;
-Channel::Channel(bool _transactional, uint16_t _prefetch) :
- basic(*this),
+Channel::Channel(bool _transactional, u_int16_t _prefetch,
+ MessageChannel* impl) :
+ // FIXME aconway 2007-03-21: MessageMessageChannel
+ messaging(impl ? impl : new BasicMessageChannel(*this)),
connection(0),
prefetch(_prefetch),
transactional(_transactional)
@@ -115,7 +120,7 @@
bool Channel::isOpen() const { return connection; }
void Channel::setQos() {
- basic.setQos();
+ messaging->setQos();
// FIXME aconway 2007-02-22: message
}
@@ -192,7 +197,7 @@
}
try {
switch (method->amqpClassId()) {
- case BasicDeliverBody::CLASS_ID: basic.handle(method); break;
+ case BasicDeliverBody::CLASS_ID: messaging->handle(method); break;
case ChannelCloseBody::CLASS_ID: handleChannel(method); break;
case ConnectionCloseBody::CLASS_ID: handleConnection(method); break;
default: throw UnknownMethod();
@@ -226,11 +231,11 @@
}
void Channel::handleHeader(AMQHeaderBody::shared_ptr body){
- basic.incoming.add(body);
+ messaging->handle(body);
}
void Channel::handleContent(AMQContentBody::shared_ptr body){
- basic.incoming.add(body);
+ messaging->handle(body);
}
void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){
@@ -238,7 +243,7 @@
}
void Channel::start(){
- basicDispatcher = Thread(basic);
+ dispatcher = Thread(*messaging);
}
// Close called by local application.
@@ -274,13 +279,12 @@
void Channel::closeInternal() {
if (isOpen());
{
- basic.cancelAll();
- basic.incoming.shutdown();
+ messaging->close();
connection = 0;
// A 0 response means we are closed.
responses.signalResponse(AMQMethodBody::shared_ptr());
}
- basicDispatcher.join();
+ dispatcher.join();
}
void Channel::sendAndReceive(AMQMethodBody* toSend, ClassId c, MethodId m)
@@ -299,4 +303,31 @@
send(body);
}
+void Channel::consume(
+ Queue& queue, std::string& tag, MessageListener* listener,
+ AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields) {
+ messaging->consume(queue, tag, listener, ackMode, noLocal, synch, fields);
+}
+
+void Channel::cancel(const std::string& tag, bool synch) {
+ messaging->cancel(tag, synch);
+}
+
+bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) {
+ return messaging->get(msg, queue, ackMode);
+}
+
+void Channel::publish(const Message& msg, const Exchange& exchange,
+ const std::string& routingKey,
+ bool mandatory, bool immediate) {
+ messaging->publish(msg, exchange, routingKey, mandatory, immediate);
+}
+
+void Channel::setReturnedMessageHandler(ReturnedMessageHandler* handler) {
+ messaging->setReturnedMessageHandler(handler);
+}
+
+void Channel::run() {
+ messaging->run();
+}
Modified: incubator/qpid/trunk/qpid/cpp/lib/client/ClientChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/client/ClientChannel.h?view=diff&rev=520972&r1=520971&r2=520972
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/client/ClientChannel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/client/ClientChannel.h Wed Mar 21 12:12:14 2007
@@ -21,7 +21,7 @@
* under the License.
*
*/
-#include "sys/types.h"
+#include <boost/scoped_ptr.hpp>
#include <framing/amqp_framing.h>
#include <ClientExchange.h>
#include <ClientMessage.h>
@@ -29,7 +29,7 @@
#include <ResponseHandler.h>
#include "ChannelAdapter.h"
#include "Thread.h"
-#include "Basic.h"
+#include "AckMode.h"
namespace qpid {
@@ -41,7 +41,9 @@
namespace client {
class Connection;
-
+class MessageChannel;
+class MessageListener;
+class ReturnedMessageHandler;
/**
* Represents an AMQP channel, i.e. loosely a session of work. It
@@ -53,16 +55,12 @@
class Channel : public framing::ChannelAdapter
{
private:
- // TODO aconway 2007-02-22: Remove friendship.
- friend class Basic;
- // FIXME aconway 2007-02-22: friend class Message;
-
struct UnknownMethod {};
sys::Mutex lock;
- Basic basic;
+ boost::scoped_ptr<MessageChannel> messaging;
Connection* connection;
- sys::Thread basicDispatcher;
+ sys::Thread dispatcher;
ResponseHandler responses;
uint16_t prefetch;
@@ -107,7 +105,10 @@
void closeInternal();
void peerClose(boost::shared_ptr<framing::ChannelCloseBody>);
+ // FIXME aconway 2007-02-23: Get rid of friendships.
friend class Connection;
+ friend class BasicMessageChannel; // for sendAndReceive.
+ friend class MessageMessageChannel; // for sendAndReceive.
public:
@@ -121,8 +122,15 @@
* @param prefetch specifies the number of unacknowledged
* messages the channel is willing to have sent to it
* asynchronously
- */
- Channel(bool transactional = false, uint16_t prefetch = 500);
+ *
+ * @param messageImpl Alternate messaging implementation class to
+ * allow alternate protocol implementations of messaging
+ * operations. Takes ownership.
+ */
+ Channel(
+ bool transactional = false, u_int16_t prefetch = 500,
+ MessageChannel* messageImpl = 0);
+
~Channel();
/**
@@ -190,13 +198,6 @@
bool synch = true);
/**
- * Get a Basic object which provides functions to send and
- * receive messages using the AMQP 0-8 Basic class methods.
- *@see Basic
- */
- Basic& getBasic() { return basic; }
-
- /**
* For a transactional channel this will commit all
* publications and acknowledgements since the last commit (or
* the channel was opened if there has been no previous
@@ -243,6 +244,106 @@
/** True if the channel is open */
bool isOpen() const;
+
+ /** Get the connection associated with this channel */
+ Connection& getConnection() { return *connection; }
+
+ /** Return the protocol version */
+ framing::ProtocolVersion getVersion() const { return version ; }
+
+ /**
+ * Creates a 'consumer' for a queue. Messages in (or arriving
+ * at) that queue will be delivered to consumers
+ * asynchronously.
+ *
+ * @param queue a Queue instance representing the queue to
+ * consume from
+ *
+ * @param tag an identifier to associate with the consumer
+ * that can be used to cancel its subscription (if empty, this
+ * will be assigned by the broker)
+ *
+ * @param listener a pointer to an instance of an
+ * implementation of the MessageListener interface. Messages
+ * received from this queue for this consumer will result in
+ * invocation of the received() method on the listener, with
+ * the message itself passed in.
+ *
+ * @param ackMode the mode of acknowledgement that the broker
+ * should assume for this consumer. @see AckMode
+ *
+ * @param noLocal if true, this consumer will not be sent any
+ * message published by this connection
+ *
+ * @param synch if true this call will block until a response
+ * is received from the broker
+ */
+ void consume(
+ Queue& queue, std::string& tag, MessageListener* listener,
+ AckMode ackMode = NO_ACK, bool noLocal = false, bool synch = true,
+ const framing::FieldTable* fields = 0);
+
+ /**
+ * Cancels a subscription previously set up through a call to consume().
+ *
+ * @param tag the identifier used (or assigned) in the consume
+ * request that set up the subscription to be cancelled.
+ *
+ * @param synch if true this call will block until a response
+ * is received from the broker
+ */
+ void cancel(const std::string& tag, bool synch = true);
+ /**
+ * Synchronous pull of a message from a queue.
+ *
+ * @param msg a message object that will contain the message
+ * headers and content if the call completes.
+ *
+ * @param queue the queue to consume from
+ *
+ * @param ackMode the acknowledgement mode to use (@see
+ * AckMode)
+ *
+ * @return true if a message was succcessfully dequeued from
+ * the queue, false if the queue was empty.
+ */
+ bool get(Message& msg, const Queue& queue, AckMode ackMode = NO_ACK);
+
+ /**
+ * Publishes (i.e. sends a message to the broker).
+ *
+ * @param msg the message to publish
+ *
+ * @param exchange the exchange to publish the message to
+ *
+ * @param routingKey the routing key to publish with
+ *
+ * @param mandatory if true and the exchange to which this
+ * publish is directed has no matching bindings, the message
+ * will be returned (see setReturnedMessageHandler()).
+ *
+ * @param immediate if true and there is no consumer to
+ * receive this message on publication, the message will be
+ * returned (see setReturnedMessageHandler()).
+ */
+ void publish(const Message& msg, const Exchange& exchange,
+ const std::string& routingKey,
+ bool mandatory = false, bool immediate = false);
+
+ /**
+ * Set a handler for this channel that will process any
+ * returned messages
+ *
+ * @see publish()
+ */
+ void setReturnedMessageHandler(ReturnedMessageHandler* handler);
+
+ /**
+ * Deliver messages from the broker to the appropriate MessageListener.
+ */
+ void run();
+
+
};
}}
Modified: incubator/qpid/trunk/qpid/cpp/lib/client/ClientMessage.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/client/ClientMessage.cpp?view=diff&rev=520972&r1=520971&r2=520972
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/client/ClientMessage.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/client/ClientMessage.cpp Wed Mar 21 12:12:14 2007
@@ -30,7 +30,6 @@
void Message::setData(const std::string& d) {
data = d;
- header->setContentSize(d.size());
}
Message::Message(AMQHeaderBody::shared_ptr& _header) : header(_header){
Modified: incubator/qpid/trunk/qpid/cpp/lib/client/IncomingMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/client/IncomingMessage.h?view=diff&rev=520972&r1=520971&r2=520972
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/client/IncomingMessage.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/client/IncomingMessage.h Wed Mar 21 12:12:14 2007
@@ -43,6 +43,7 @@
*
* Broker initiated messages (basic.return, basic.deliver) are
* queued for handling by the user dispatch thread.
+ *
*/
class IncomingMessage {
public:
Modified: incubator/qpid/trunk/qpid/cpp/lib/client/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/client/Makefile.am?view=diff&rev=520972&r1=520971&r2=520972
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/client/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/client/Makefile.am Wed Mar 21 12:12:14 2007
@@ -14,7 +14,7 @@
ClientExchange.cpp \
ClientMessage.cpp \
ClientQueue.cpp \
- Basic.cpp \
+ BasicMessageChannel.cpp \
Connection.cpp \
Connector.cpp \
IncomingMessage.cpp \
@@ -22,14 +22,16 @@
ResponseHandler.cpp \
ReturnedMessageHandler.cpp
pkginclude_HEADERS = \
+ AckMode.h \
ClientChannel.h \
ClientExchange.h \
ClientMessage.h \
ClientQueue.h \
- Basic.h \
Connection.h \
Connector.h \
IncomingMessage.h \
+ MessageChannel.h \
+ BasicMessageChannel.h \
MessageListener.h \
MethodBodyInstances.h \
ResponseHandler.h \
Added: incubator/qpid/trunk/qpid/cpp/lib/client/MessageChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/client/MessageChannel.h?view=auto&rev=520972
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/client/MessageChannel.h (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/client/MessageChannel.h Wed Mar 21 12:12:14 2007
@@ -0,0 +1,94 @@
+#ifndef _client_MessageChannel_h
+#define _client_MessageChannel_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "shared_ptr.h"
+#include "sys/Runnable.h"
+#include "AckMode.h"
+
+namespace qpid {
+
+namespace framing {
+class AMQMethodBody;
+class AMQHeaderBody;
+class AMQContentBody;
+class FieldTable;
+}
+
+namespace client {
+
+class Channel;
+class Message;
+class Queue;
+class Exchange;
+class MessageListener;
+class ReturnedMessageHandler;
+
+/**
+ * Abstract interface for messaging implementation for a channel.
+ *
+ *@see Channel for documentation.
+ */
+class MessageChannel : public sys::Runnable
+{
+ public:
+ /**@see Channel::consume */
+ virtual void consume(
+ Queue& queue, std::string& tag, MessageListener* listener,
+ AckMode ackMode = NO_ACK, bool noLocal = false, bool synch = true,
+ const framing::FieldTable* fields = 0) = 0;
+
+ /**@see Channel::cancel */
+ virtual void cancel(const std::string& tag, bool synch = true) = 0;
+
+ /**@see Channel::get */
+ virtual bool get(
+ Message& msg, const Queue& queue, AckMode ackMode = NO_ACK) = 0;
+
+ /**@see Channel::get */
+ virtual void publish(const Message& msg, const Exchange& exchange,
+ const std::string& routingKey,
+ bool mandatory = false, bool immediate = false) = 0;
+
+ /**@see Channel::setReturnedMessageHandler */
+ virtual void setReturnedMessageHandler(
+ ReturnedMessageHandler* handler) = 0;
+
+ /** Handle an incoming method. */
+ virtual void handle(shared_ptr<framing::AMQMethodBody>) = 0;
+
+ /** Handle an incoming header */
+ virtual void handle(shared_ptr<framing::AMQHeaderBody>) = 0;
+
+ /** Handle an incoming content */
+ virtual void handle(shared_ptr<framing::AMQContentBody>) = 0;
+
+ /** Send channel's QOS settings */
+ virtual void setQos() = 0;
+
+ /** Channel is closing */
+ virtual void close() = 0;
+};
+
+}} // namespace qpid::client
+
+
+
+#endif /*!_client_MessageChannel_h*/
Propchange: incubator/qpid/trunk/qpid/cpp/lib/client/MessageChannel.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/lib/client/MessageChannel.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/lib/common/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/common/Makefile.am?view=diff&rev=520972&r1=520971&r2=520972
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/common/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/common/Makefile.am Wed Mar 21 12:12:14 2007
@@ -117,6 +117,7 @@
$(framing)/amqp_framing.h \
$(framing)/amqp_types.h \
$(framing)/Proxy.h \
+ shared_ptr.h \
Exception.h \
ExceptionHolder.h \
QpidError.h \
Modified: incubator/qpid/trunk/qpid/cpp/lib/common/framing/FramingContent.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/common/framing/FramingContent.h?view=diff&rev=520972&r1=520971&r2=520972
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/common/framing/FramingContent.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/common/framing/FramingContent.h Wed Mar 21 12:12:14 2007
@@ -30,6 +30,7 @@
bool isInline() const { return discriminator == INLINE; }
bool isReference() const { return discriminator == REFERENCE; }
const string& getValue() const { return value; }
+ void setValue(const string& newValue) { value = newValue; }
friend std::ostream& operator<<(std::ostream&, const Content&);
};
Added: incubator/qpid/trunk/qpid/cpp/lib/common/shared_ptr.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/common/shared_ptr.h?view=auto&rev=520972
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/common/shared_ptr.h (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/common/shared_ptr.h Wed Mar 21 12:12:14 2007
@@ -0,0 +1,31 @@
+#ifndef _common_shared_ptr_h
+#define _common_shared_ptr_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+/// Import shared_ptr into qpid namespace.
+using boost::shared_ptr;
+} // namespace qpid
+
+
+
+#endif /*!_common_shared_ptr_h*/
Propchange: incubator/qpid/trunk/qpid/cpp/lib/common/shared_ptr.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/lib/common/shared_ptr.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/tests/ClientChannelTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/ClientChannelTest.cpp?view=diff&rev=520972&r1=520971&r2=520972
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/ClientChannelTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/tests/ClientChannelTest.cpp Wed Mar 21 12:12:14 2007
@@ -32,6 +32,10 @@
using namespace qpid::sys;
using namespace qpid::framing;
+/// Small frame size so we can create fragmented messages.
+const size_t FRAME_MAX = 256;
+
+
/**
* Test client API using an in-process broker.
*/
@@ -42,6 +46,8 @@
CPPUNIT_TEST(testGetNoContent);
CPPUNIT_TEST(testConsumeCancel);
CPPUNIT_TEST(testConsumePublished);
+ CPPUNIT_TEST(testGetFragmentedMessage);
+ CPPUNIT_TEST(testConsumeFragmentedMessage);
CPPUNIT_TEST_SUITE_END();
struct Listener: public qpid::client::MessageListener {
@@ -65,7 +71,8 @@
public:
ClientChannelTest()
- : qname("testq"), data("hello"),
+ : connection(FRAME_MAX),
+ qname("testq"), data("hello"),
queue(qname, true), exchange("", Exchange::DIRECT_EXCHANGE)
{
connection.openChannel(channel);
@@ -76,21 +83,21 @@
void testPublishGet() {
Message pubMsg(data);
pubMsg.getHeaders().setString("hello", "world");
- channel.getBasic().publish(pubMsg, exchange, qname);
+ channel.publish(pubMsg, exchange, qname);
Message getMsg;
- CPPUNIT_ASSERT(channel.getBasic().get(getMsg, queue));
+ CPPUNIT_ASSERT(channel.get(getMsg, queue));
CPPUNIT_ASSERT_EQUAL(data, getMsg.getData());
CPPUNIT_ASSERT_EQUAL(string("world"),
getMsg.getHeaders().getString("hello"));
- CPPUNIT_ASSERT(!channel.getBasic().get(getMsg, queue)); // Empty queue
+ CPPUNIT_ASSERT(!channel.get(getMsg, queue)); // Empty queue
}
void testGetNoContent() {
Message pubMsg;
pubMsg.getHeaders().setString("hello", "world");
- channel.getBasic().publish(pubMsg, exchange, qname);
+ channel.publish(pubMsg, exchange, qname);
Message getMsg;
- CPPUNIT_ASSERT(channel.getBasic().get(getMsg, queue));
+ CPPUNIT_ASSERT(channel.get(getMsg, queue));
CPPUNIT_ASSERT(getMsg.getData().empty());
CPPUNIT_ASSERT_EQUAL(string("world"),
getMsg.getHeaders().getString("hello"));
@@ -98,10 +105,10 @@
void testConsumeCancel() {
string tag; // Broker assigned
- channel.getBasic().consume(queue, tag, &listener);
+ channel.consume(queue, tag, &listener);
channel.start();
CPPUNIT_ASSERT_EQUAL(size_t(0), listener.messages.size());
- channel.getBasic().publish(Message("a"), exchange, qname);
+ channel.publish(Message("a"), exchange, qname);
{
Mutex::ScopedLock l(listener.monitor);
Time deadline(now() + 1*TIME_SEC);
@@ -112,8 +119,8 @@
CPPUNIT_ASSERT_EQUAL(size_t(1), listener.messages.size());
CPPUNIT_ASSERT_EQUAL(string("a"), listener.messages[0].getData());
- channel.getBasic().publish(Message("b"), exchange, qname);
- channel.getBasic().publish(Message("c"), exchange, qname);
+ channel.publish(Message("b"), exchange, qname);
+ channel.publish(Message("c"), exchange, qname);
{
Mutex::ScopedLock l(listener.monitor);
while (listener.messages.size() != 3) {
@@ -124,15 +131,15 @@
CPPUNIT_ASSERT_EQUAL(string("b"), listener.messages[1].getData());
CPPUNIT_ASSERT_EQUAL(string("c"), listener.messages[2].getData());
- channel.getBasic().cancel(tag);
- channel.getBasic().publish(Message("d"), exchange, qname);
+ channel.cancel(tag);
+ channel.publish(Message("d"), exchange, qname);
CPPUNIT_ASSERT_EQUAL(size_t(3), listener.messages.size());
{
Mutex::ScopedLock l(listener.monitor);
CPPUNIT_ASSERT(!listener.monitor.wait(TIME_SEC/2));
}
Message msg;
- CPPUNIT_ASSERT(channel.getBasic().get(msg, queue));
+ CPPUNIT_ASSERT(channel.get(msg, queue));
CPPUNIT_ASSERT_EQUAL(string("d"), msg.getData());
}
@@ -140,9 +147,9 @@
void testConsumePublished() {
Message pubMsg("x");
pubMsg.getHeaders().setString("y", "z");
- channel.getBasic().publish(pubMsg, exchange, qname);
+ channel.publish(pubMsg, exchange, qname);
string tag;
- channel.getBasic().consume(queue, tag, &listener);
+ channel.consume(queue, tag, &listener);
CPPUNIT_ASSERT_EQUAL(size_t(0), listener.messages.size());
channel.start();
{
@@ -155,8 +162,40 @@
listener.messages[0].getHeaders().getString("y"));
}
+ void testGetFragmentedMessage() {
+ string longStr(FRAME_MAX*2, 'x'); // Longer than max frame size.
+ channel.publish(Message(longStr), exchange, qname);
+ // FIXME aconway 2007-03-21: Remove couts.
+ cout << "==== Fragmented publish:" << endl
+ << connection.conversation << endl;
+ Message getMsg;
+ cout << "==== Fragmented get:" << endl
+ << connection.conversation << endl;
+ CPPUNIT_ASSERT(channel.get(getMsg, queue));
+ }
-
+ void testConsumeFragmentedMessage() {
+ string xx(FRAME_MAX*2, 'x');
+ channel.publish(Message(xx), exchange, qname);
+ cout << "==== Fragmented publish:" << endl
+ << connection.conversation << endl;
+ channel.start();
+ string tag;
+ channel.consume(queue, tag, &listener);
+ string yy(FRAME_MAX*2, 'y');
+ channel.publish(Message(yy), exchange, qname);
+ {
+ Mutex::ScopedLock l(listener.monitor);
+ while (listener.messages.size() != 2)
+ CPPUNIT_ASSERT(listener.monitor.wait(1*TIME_SEC));
+ }
+ // FIXME aconway 2007-03-21:
+ cout << "==== Fragmented consme 2 messages:" << endl
+ << connection.conversation << endl;
+
+ CPPUNIT_ASSERT_EQUAL(xx, listener.messages[0].getData());
+ CPPUNIT_ASSERT_EQUAL(yy, listener.messages[1].getData());
+ }
};
// Make this test suite a plugin.
Modified: incubator/qpid/trunk/qpid/cpp/tests/InProcessBroker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/InProcessBroker.h?view=diff&rev=520972&r1=520971&r2=520972
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/InProcessBroker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/tests/InProcessBroker.h Wed Mar 21 12:12:14 2007
@@ -145,25 +145,30 @@
return out;
}
+} // namespace broker
-}} // namespace qpid::broker
-
+namespace client {
/** An in-process client+broker all in one. */
-class InProcessBrokerClient : public qpid::client::Connection {
+class InProcessBrokerClient : public client::Connection {
public:
- qpid::broker::InProcessBroker broker;
- qpid::broker::InProcessBroker::Conversation& conversation;
+ broker::InProcessBroker broker;
+ broker::InProcessBroker::Conversation& conversation;
/** Constructor creates broker and opens client connection. */
- InProcessBrokerClient(qpid::framing::ProtocolVersion version=
- qpid::framing::highestProtocolVersion
- ) : broker(version), conversation(broker.conversation)
+ InProcessBrokerClient(
+ u_int32_t max_frame_size=65536,
+ framing::ProtocolVersion version= framing::highestProtocolVersion
+ ) : client::Connection(false, max_frame_size, version),
+ broker(version),
+ conversation(broker.conversation)
{
setConnector(broker);
open("");
}
-
- ~InProcessBrokerClient() {}
};
+
+
+}} // namespace qpid::client
+
#endif // _tests_InProcessBroker_h
Modified: incubator/qpid/trunk/qpid/cpp/tests/ProducerConsumerTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/ProducerConsumerTest.cpp?view=diff&rev=520972&r1=520971&r2=520972
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/ProducerConsumerTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/tests/ProducerConsumerTest.cpp Wed Mar 21 12:12:14 2007
@@ -30,8 +30,9 @@
#include "AMQP_HighestVersion.h"
#include "sys/AtomicCount.h"
-using namespace qpid::sys;
-using namespace qpid::framing;
+using namespace qpid;
+using namespace sys;
+using namespace framing;
using namespace boost;
using namespace std;
@@ -99,7 +100,7 @@
CPPUNIT_TEST_SUITE_END();
public:
- InProcessBrokerClient client;
+ client::InProcessBrokerClient client;
ProducerConsumer pc;
WatchedCounter stopped;
@@ -166,7 +167,7 @@
}
public:
- ProducerConsumerTest() : client(highestProtocolVersion) {}
+ ProducerConsumerTest() : client() {}
void testProduceConsume() {
ConsumeRunnable runMe(*this);
Modified: incubator/qpid/trunk/qpid/cpp/tests/client_test.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/client_test.cpp?view=diff&rev=520972&r1=520971&r2=520972
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/client_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/tests/client_test.cpp Wed Mar 21 12:12:14 2007
@@ -102,7 +102,7 @@
Monitor monitor;
SimpleListener listener(&monitor);
string tag("MyTag");
- channel.getBasic().consume(queue, tag, &listener);
+ channel.consume(queue, tag, &listener);
if (verbose) std::cout << "Registered consumer." << std::endl;
//we need to enable the message dispatching for this channel
@@ -115,7 +115,7 @@
Message msg;
string data("MyMessage");
msg.setData(data);
- channel.getBasic().publish(msg, exchange, "MyTopic");
+ channel.publish(msg, exchange, "MyTopic");
if (verbose) std::cout << "Published message: " << data << std::endl;
{
Modified: incubator/qpid/trunk/qpid/cpp/tests/echo_service.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/echo_service.cpp?view=diff&rev=520972&r1=520971&r2=520972
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/echo_service.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/tests/echo_service.cpp Wed Mar 21 12:12:14 2007
@@ -116,7 +116,7 @@
//Consume from the response queue, logging all echoed message to console:
LoggingListener listener;
std::string tag;
- channel.getBasic().consume(response, tag, &listener);
+ channel.consume(response, tag, &listener);
//Process incoming requests on a new thread
channel.start();
@@ -129,7 +129,7 @@
Message msg;
msg.getHeaders().setString("RESPONSE_QUEUE", response.getName());
msg.setData(text);
- channel.getBasic().publish(msg, Exchange::STANDARD_DIRECT_EXCHANGE, echo_service);
+ channel.publish(msg, Exchange::STANDARD_DIRECT_EXCHANGE, echo_service);
std::cout << "Enter text to send:" << std::endl;
}
@@ -158,10 +158,10 @@
//Consume from the request queue, echoing back all messages received to the client that sent them
EchoServer server(&channel);
std::string tag = "server_tag";
- channel.getBasic().consume(request, tag, &server);
+ channel.consume(request, tag, &server);
//Process incoming requests on the main thread
- channel.getBasic().run();
+ channel.run();
connection.close();
} catch(qpid::QpidError error) {
@@ -184,7 +184,7 @@
std::cout << "Echoing " << message.getData() << " back to " << name << std::endl;
//'echo' the message back:
- channel->getBasic().publish(message, Exchange::STANDARD_DIRECT_EXCHANGE, name);
+ channel->publish(message, Exchange::STANDARD_DIRECT_EXCHANGE, name);
}
}
Modified: incubator/qpid/trunk/qpid/cpp/tests/topic_listener.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/topic_listener.cpp?view=diff&rev=520972&r1=520971&r2=520972
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/topic_listener.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/tests/topic_listener.cpp Wed Mar 21 12:12:14 2007
@@ -119,9 +119,9 @@
//set up listener
Listener listener(&channel, response.getName(), args.getTransactional());
string tag;
- channel.getBasic().consume(control, tag, &listener, args.getAckMode());
+ channel.consume(control, tag, &listener, args.getAckMode());
cout << "topic_listener: Consuming." << endl;
- channel.getBasic().run();
+ channel.run();
connection.close();
cout << "topic_listener: normal exit" << endl;
return 0;
@@ -166,7 +166,7 @@
<< time/TIME_MSEC << " ms.";
Message msg(reportstr.str());
msg.getHeaders().setString("TYPE", "REPORT");
- channel->getBasic().publish(msg, string(), responseQueue);
+ channel->publish(msg, string(), responseQueue);
if(transactional){
channel->commit();
}
Modified: incubator/qpid/trunk/qpid/cpp/tests/topic_publisher.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/topic_publisher.cpp?view=diff&rev=520972&r1=520971&r2=520972
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/topic_publisher.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/tests/topic_publisher.cpp Wed Mar 21 12:12:14 2007
@@ -129,7 +129,7 @@
//set up listener
Publisher publisher(&channel, "topic_control", args.getTransactional());
std::string tag("mytag");
- channel.getBasic().consume(response, tag, &publisher, args.getAckMode());
+ channel.consume(response, tag, &publisher, args.getAckMode());
channel.start();
int batchSize(args.getBatches());
@@ -187,12 +187,13 @@
{
Monitor::ScopedLock l(monitor);
for(int i = 0; i < msgs; i++){
- channel->getBasic().publish(msg, Exchange::STANDARD_TOPIC_EXCHANGE, controlTopic);
+ channel->publish(
+ msg, Exchange::STANDARD_TOPIC_EXCHANGE, controlTopic);
}
//send report request
Message reportRequest;
reportRequest.getHeaders().setString("TYPE", "REPORT_REQUEST");
- channel->getBasic().publish(reportRequest, Exchange::STANDARD_TOPIC_EXCHANGE, controlTopic);
+ channel->publish(reportRequest, Exchange::STANDARD_TOPIC_EXCHANGE, controlTopic);
if(transactional){
channel->commit();
}
@@ -216,7 +217,7 @@
//send termination request
Message terminationRequest;
terminationRequest.getHeaders().setString("TYPE", "TERMINATION_REQUEST");
- channel->getBasic().publish(terminationRequest, Exchange::STANDARD_TOPIC_EXCHANGE, controlTopic);
+ channel->publish(terminationRequest, Exchange::STANDARD_TOPIC_EXCHANGE, controlTopic);
if(transactional){
channel->commit();
}