You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/04/04 17:45:37 UTC
svn commit: r525542 - in /incubator/qpid/trunk/qpid/cpp/src: Makefile.am
client/ClientChannel.cpp client/ClientChannel.h
client/MessageMessageChannel.cpp client/MessageMessageChannel.h
tests/ClientChannelTest.cpp
Author: aconway
Date: Wed Apr 4 08:45:37 2007
New Revision: 525542
URL: http://svn.apache.org/viewvc?view=rev&rev=525542
Log:
* Made client::Channel bi-modal: 0-8 or 0-9 modes.
* Added dummy impl of client::MessageMessageChannel.
* Generalised ClientChannelTest to be able to test both modes.
Added:
incubator/qpid/trunk/qpid/cpp/src/client/MessageMessageChannel.cpp
incubator/qpid/trunk/qpid/cpp/src/client/MessageMessageChannel.h
Modified:
incubator/qpid/trunk/qpid/cpp/src/Makefile.am
incubator/qpid/trunk/qpid/cpp/src/client/ClientChannel.cpp
incubator/qpid/trunk/qpid/cpp/src/client/ClientChannel.h
incubator/qpid/trunk/qpid/cpp/src/tests/ClientChannelTest.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?view=diff&rev=525542&r1=525541&r2=525542
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Wed Apr 4 08:45:37 2007
@@ -201,7 +201,8 @@
$(client)/ClientExchange.cpp \
$(client)/ClientQueue.cpp \
$(client)/BasicMessageChannel.cpp \
- $(client)/Connector.cpp \
+ $(client)/MessageMessageChannel.cpp \
+ $(client)/Connector.cpp \
$(client)/IncomingMessage.cpp \
$(client)/MessageListener.cpp \
$(client)/ResponseHandler.cpp \
@@ -245,10 +246,11 @@
$(client)/IncomingMessage.h \
$(client)/MessageChannel.h \
$(client)/BasicMessageChannel.h \
- $(client)/MessageListener.h \
- $(client)/MethodBodyInstances.h \
- $(client)/ResponseHandler.h \
- $(client)/ReturnedMessageHandler.h \
+ $(client)/MessageMessageChannel.h \
+ $(client)/MessageListener.h \
+ $(client)/MethodBodyInstances.h \
+ $(client)/ResponseHandler.h \
+ $(client)/ReturnedMessageHandler.h \
shared_ptr.h \
Exception.h \
ExceptionHolder.h \
Modified: incubator/qpid/trunk/qpid/cpp/src/client/ClientChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/client/ClientChannel.cpp?view=diff&rev=525542&r1=525541&r2=525542
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/client/ClientChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/client/ClientChannel.cpp Wed Apr 4 08:45:37 2007
@@ -26,8 +26,7 @@
#include "MethodBodyInstances.h"
#include "Connection.h"
#include "BasicMessageChannel.h"
-// FIXME aconway 2007-03-21:
-//#include "MessageMessageChannel.h"
+#include "MessageMessageChannel.h"
// FIXME aconway 2007-01-26: Evaluate all throws, ensure consistent
// handling of errors that should close the connection or the channel.
@@ -39,14 +38,15 @@
using namespace qpid::framing;
using namespace qpid::sys;
-Channel::Channel(bool _transactional, u_int16_t _prefetch,
- MessageChannel* impl) :
- // FIXME aconway 2007-03-21: MessageMessageChannel
- messaging(impl ? impl : new BasicMessageChannel(*this)),
- connection(0),
- prefetch(_prefetch),
- transactional(_transactional)
-{ }
+Channel::Channel(bool _transactional, u_int16_t _prefetch, InteropMode mode) :
+ connection(0), prefetch(_prefetch), transactional(_transactional)
+{
+ switch (mode) {
+ case AMQP_08: messaging.reset(new BasicMessageChannel(*this)); break;
+ case AMQP_09: messaging.reset(new MessageMessageChannel(*this)); break;
+ default: assert(0); QPID_ERROR(INTERNAL_ERROR, "Invalid interop-mode.");
+ }
+}
Channel::~Channel(){
close();
Modified: incubator/qpid/trunk/qpid/cpp/src/client/ClientChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/client/ClientChannel.h?view=diff&rev=525542&r1=525541&r2=525542
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/client/ClientChannel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/client/ClientChannel.h Wed Apr 4 08:45:37 2007
@@ -112,6 +112,7 @@
friend class MessageMessageChannel; // for sendAndReceive.
public:
+ enum InteropMode { AMQP_08, AMQP_09 };
/**
* Creates a channel object.
@@ -130,7 +131,7 @@
*/
Channel(
bool transactional = false, u_int16_t prefetch = 500,
- MessageChannel* messageImpl = 0);
+ InteropMode=AMQP_08);
~Channel();
Added: incubator/qpid/trunk/qpid/cpp/src/client/MessageMessageChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/client/MessageMessageChannel.cpp?view=auto&rev=525542
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/client/MessageMessageChannel.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/client/MessageMessageChannel.cpp Wed Apr 4 08:45:37 2007
@@ -0,0 +1,331 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <iostream>
+#include <boost/format.hpp>
+#include "MessageMessageChannel.h"
+#include "../framing/AMQMethodBody.h"
+#include "ClientChannel.h"
+#include "ReturnedMessageHandler.h"
+#include "MessageListener.h"
+#include "../framing/FieldTable.h"
+#include "Connection.h"
+#include "../shared_ptr.h"
+
+namespace qpid {
+namespace client {
+
+using namespace std;
+using namespace sys;
+using namespace framing;
+
+MessageMessageChannel::MessageMessageChannel(Channel& ch)
+ : channel(ch), tagCount(0) {}
+
+string MessageMessageChannel::newTag() {
+ Mutex::ScopedLock l(lock);
+ return (boost::format("__tag%d")%++tagCount).str();
+}
+
+void MessageMessageChannel::consume(
+ Queue& queue, std::string& tag, MessageListener* /*listener*/,
+ AckMode ackMode, bool noLocal, bool /*synch*/, const FieldTable* fields)
+{
+ if (tag.empty())
+ tag = newTag();
+ channel.sendAndReceive<MessageOkBody>(
+ new MessageConsumeBody(
+ channel.getVersion(), 0, queue.getName(), tag, noLocal,
+ ackMode == NO_ACK, false, fields ? *fields : FieldTable()));
+
+// // FIXME aconway 2007-02-20: Race condition!
+// // We could receive the first message for the consumer
+// // before we create the consumer below.
+// // Move consumer creation to handler for MessageConsumeOkBody
+// {
+// Mutex::ScopedLock l(lock);
+// ConsumerMap::iterator i = consumers.find(tag);
+// if (i != consumers.end())
+// THROW_QPID_ERROR(CLIENT_ERROR,
+// "Consumer already exists with tag="+tag);
+// Consumer& c = consumers[tag];
+// c.listener = listener;
+// c.ackMode = ackMode;
+// c.lastDeliveryTag = 0;
+// }
+}
+
+
+void MessageMessageChannel::cancel(const std::string& /*tag*/, bool /*synch*/) {
+ // FIXME aconway 2007-02-23:
+// Consumer c;
+// {
+// Mutex::ScopedLock l(lock);
+// ConsumerMap::iterator i = consumers.find(tag);
+// if (i == consumers.end())
+// return;
+// c = i->second;
+// consumers.erase(i);
+// }
+// if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0)
+// channel.send(new MessageAckBody(channel.version, c.lastDeliveryTag, true));
+// channel.sendAndReceiveSync<MessageCancelOkBody>(
+// synch, new MessageCancelBody(channel.version, tag, !synch));
+}
+
+void MessageMessageChannel::close(){
+ // FIXME aconway 2007-02-23:
+// ConsumerMap consumersCopy;
+// {
+// Mutex::ScopedLock l(lock);
+// consumersCopy = consumers;
+// consumers.clear();
+// }
+// for (ConsumerMap::iterator i=consumersCopy.begin();
+// i != consumersCopy.end(); ++i)
+// {
+// Consumer& c = i->second;
+// if ((c.ackMode == LAZY_ACK || c.ackMode == AUTO_ACK)
+// && c.lastDeliveryTag > 0)
+// {
+// channel.send(new MessageAckBody(channel.version, c.lastDeliveryTag, true));
+// }
+// }
+// incoming.shutdown();
+}
+
+
+/** Destination ID for the current get.
+ * Must not clash with a generated consumer ID.
+ * TODO aconway 2007-03-06: support multiple outstanding gets?
+ */
+const string getDestinationId("__get__");
+
+bool MessageMessageChannel::get(
+ Message& , const Queue& , AckMode )
+{
+ Mutex::ScopedLock l(lock);
+// incoming.addDestination(getDestinationId, getDest);
+// channel.send(
+// new MessageGetBody(
+// channel.version, 0, queue.getName(), getDestinationId, ackMode));
+// return getDest.wait(msg);
+ return false;
+}
+
+
+/** Convert a message to a transfer command. */
+MessageTransferBody::shared_ptr makeTransfer(
+ ProtocolVersion version,
+ const Message& msg, const string& destination,
+ const std::string& routingKey, bool mandatory, bool immediate)
+{
+ return MessageTransferBody::shared_ptr(
+ new MessageTransferBody(
+ version,
+ 0, // FIXME aconway 2007-04-03: ticket.
+ destination,
+ msg.isRedelivered(),
+ immediate,
+ 0, // FIXME aconway 2007-02-23: ttl
+ msg.getPriority(),
+ msg.getTimestamp(),
+ static_cast<uint8_t>(msg.getDeliveryMode()),
+ 0, // FIXME aconway 2007-04-03: Expiration
+ string(), // Exchange: for broker use only.
+ routingKey,
+ msg.getMessageId(),
+ msg.getCorrelationId(),
+ msg.getReplyTo(),
+ msg.getContentType(),
+ msg.getContentEncoding(),
+ msg.getUserId(),
+ msg.getAppId(),
+ string(), // FIXME aconway 2007-04-03: TransactionId
+ string(), //FIXME aconway 2007-04-03: SecurityToken
+ msg.getHeaders(),
+ Content(INLINE, msg.getData()),
+ mandatory
+ ));
+}
+
+void MessageMessageChannel::publish(
+ const Message& msg, const Exchange& exchange,
+ const std::string& routingKey, bool mandatory, bool immediate)
+{
+ MessageTransferBody::shared_ptr transfer = makeTransfer(
+ channel.getVersion(),
+ msg, exchange.getName(), routingKey, mandatory, immediate);
+ // Frame itself uses 8 bytes.
+ u_int32_t frameMax = channel.connection->getMaxFrameSize() - 8;
+ if (transfer->size() > frameMax) {
+ // FIXME aconway 2007-02-23:
+ throw QPID_ERROR(INTERNAL_ERROR, "References not yet implemented");
+ }
+ channel.sendAndReceive<MessageOkBody>(transfer.get());
+}
+
+
+void MessageMessageChannel::handle(boost::shared_ptr<AMQMethodBody> method) {
+ assert(method->amqpClassId() ==MessageTransferBody::CLASS_ID);
+ switch(method->amqpMethodId()) {
+ case MessageAppendBody::METHOD_ID: {
+ MessageAppendBody::shared_ptr append =
+ shared_polymorphic_downcast<MessageAppendBody>(method);
+ incoming.appendReference(append->getReference(), append->getBytes());
+ break;
+ }
+ case MessageOpenBody::METHOD_ID: {
+ MessageOpenBody::shared_ptr open =
+ shared_polymorphic_downcast<MessageOpenBody>(method);
+ incoming.openReference(open->getReference());
+ break;
+ }
+
+ case MessageCloseBody::METHOD_ID: {
+ MessageCloseBody::shared_ptr close =
+ shared_polymorphic_downcast<MessageCloseBody>(method);
+ incoming.closeReference(close->getReference());
+ break;
+ }
+
+ case MessageEmptyBody::METHOD_ID: {
+ // FIXME aconway 2007-04-04:
+ // getDest.empty();
+ break;
+ }
+
+ case MessageCancelBody::METHOD_ID:
+ case MessageCheckpointBody::METHOD_ID:
+
+ // FIXME aconway 2007-04-03: TODO
+ case MessageOkBody::METHOD_ID:
+ case MessageOffsetBody::METHOD_ID:
+ case MessageQosBody::METHOD_ID:
+ case MessageRecoverBody::METHOD_ID:
+ case MessageRejectBody::METHOD_ID:
+ case MessageResumeBody::METHOD_ID:
+ case MessageTransferBody::METHOD_ID:
+ default:
+ throw Channel::UnknownMethod();
+ }
+}
+
+void MessageMessageChannel::handle(AMQHeaderBody::shared_ptr ){
+ throw QPID_ERROR(INTERNAL_ERROR, "Basic protocol not supported");
+}
+
+void MessageMessageChannel::handle(AMQContentBody::shared_ptr ){
+ throw QPID_ERROR(INTERNAL_ERROR, "Basic protocol not supported");
+}
+
+// FIXME aconway 2007-02-23:
+// void MessageMessageChannel::deliver(IncomingMessage::Destination& consumer, Message& msg){
+// //record delivery tag:
+// consumer.lastDeliveryTag = msg.getDeliveryTag();
+
+// //allow registered listener to handle the message
+// consumer.listener->received(msg);
+
+// if(channel.isOpen()){
+// bool multiple(false);
+// switch(consumer.ackMode){
+// case LAZY_ACK:
+// multiple = true;
+// if(++(consumer.count) < channel.getPrefetch())
+// break;
+// //else drop-through
+// case AUTO_ACK:
+// consumer.lastDeliveryTag = 0;
+// channel.send(
+// new MessageAckBody(
+// channel.version, msg.getDeliveryTag(), multiple));
+// case NO_ACK: // Nothing to do
+// case CLIENT_ACK: // User code must ack.
+// break;
+// // TODO aconway 2007-02-22: Provide a way for user
+// // to ack!
+// }
+// }
+
+// //as it stands, transactionality is entirely orthogonal to ack
+// //mode, though the acks will not be processed by the broker under
+// //a transaction until it commits.
+// }
+
+
+void MessageMessageChannel::run() {
+ // FIXME aconway 2007-02-23:
+// while(channel.isOpen()) {
+// try {
+// Message msg = incoming.waitDispatch();
+// if(msg.getMethod()->isA<MessageReturnBody>()) {
+// ReturnedMessageHandler* handler=0;
+// {
+// Mutex::ScopedLock l(lock);
+// handler=returnsHandler;
+// }
+// if(handler == 0) {
+// // TODO aconway 2007-02-20: proper logging.
+// cout << "Message returned: " << msg.getData() << endl;
+// }
+// else
+// handler->returned(msg);
+// }
+// else {
+// MessageDeliverBody::shared_ptr deliverBody =
+// boost::shared_polymorphic_downcast<MessageDeliverBody>(
+// msg.getMethod());
+// std::string tag = deliverBody->getConsumerTag();
+// Consumer consumer;
+// {
+// Mutex::ScopedLock l(lock);
+// ConsumerMap::iterator i = consumers.find(tag);
+// if(i == consumers.end())
+// THROW_QPID_ERROR(PROTOCOL_ERROR+504,
+// "Unknown consumer tag=" + tag);
+// consumer = i->second;
+// }
+// deliver(consumer, msg);
+// }
+// }
+// catch (const ShutdownException&) {
+// /* Orderly shutdown */
+// }
+// catch (const Exception& e) {
+// // FIXME aconway 2007-02-20: Report exception to user.
+// cout << "client::Message::run() terminated by: " << e.toString()
+// << "(" << typeid(e).name() << ")" << endl;
+// }
+// }
+}
+
+void MessageMessageChannel::setReturnedMessageHandler(
+ ReturnedMessageHandler* )
+{
+ throw QPID_ERROR(INTERNAL_ERROR, "Message class does not support returns");
+}
+
+void MessageMessageChannel::setQos(){
+ channel.sendAndReceive<MessageOkBody>(
+ new MessageQosBody(channel.version, 0, channel.getPrefetch(), false));
+ if(channel.isTransactional())
+ channel.sendAndReceive<TxSelectOkBody>(
+ new TxSelectBody(channel.version));
+}
+
+}} // namespace qpid::client
Added: incubator/qpid/trunk/qpid/cpp/src/client/MessageMessageChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/client/MessageMessageChannel.h?view=auto&rev=525542
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/client/MessageMessageChannel.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/client/MessageMessageChannel.h Wed Apr 4 08:45:37 2007
@@ -0,0 +1,82 @@
+#ifndef _client_MessageMessageChannel_h
+#define _client_MessageMessageChannel_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "MessageChannel.h"
+#include "IncomingMessage.h"
+#include "../sys/Monitor.h"
+#include <boost/ptr_container/ptr_map.hpp>
+
+namespace qpid {
+namespace client {
+/**
+ * Messaging implementation using AMQP 0-9 MessageMessageChannel class
+ * to send and receiving messages.
+ */
+class MessageMessageChannel : public MessageChannel
+{
+ public:
+ MessageMessageChannel(Channel& parent);
+
+ void consume(
+ Queue& queue, std::string& tag, MessageListener* listener,
+ AckMode ackMode = NO_ACK, bool noLocal = false, bool synch = true,
+ const framing::FieldTable* fields = 0);
+
+ void cancel(const std::string& tag, bool synch = true);
+
+ bool get(Message& msg, const Queue& queue, AckMode ackMode = NO_ACK);
+
+ void publish(const Message& msg, const Exchange& exchange,
+ const std::string& routingKey,
+ bool mandatory = false, bool immediate = false);
+
+ void setReturnedMessageHandler(ReturnedMessageHandler* handler);
+
+ void run();
+
+ void handle(boost::shared_ptr<framing::AMQMethodBody>);
+
+ void handle(shared_ptr<framing::AMQHeaderBody>);
+
+ void handle(shared_ptr<framing::AMQContentBody>);
+
+ void setQos();
+
+ void close();
+
+ private:
+ typedef boost::ptr_map<std::string, IncomingMessage::WaitableDestination>
+ Destinations;
+
+ std::string newTag();
+
+ sys::Mutex lock;
+ Channel& channel;
+ IncomingMessage incoming;
+ long tagCount;
+};
+
+}} // namespace qpid::client
+
+
+
+#endif /*!_client_MessageMessageChannel_h*/
+
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ClientChannelTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ClientChannelTest.cpp?view=diff&rev=525542&r1=525541&r2=525542
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ClientChannelTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ClientChannelTest.cpp Wed Apr 4 08:45:37 2007
@@ -26,6 +26,7 @@
#include "../client/ClientQueue.h"
#include "../client/ClientExchange.h"
#include "../client/MessageListener.h"
+#include "../client/BasicMessageChannel.h"
using namespace std;
using namespace boost;
@@ -38,19 +39,12 @@
/**
- * Test client API using an in-process broker.
+ * Test base for client API using an in-process broker.
+ * The test base defines the tests methods, derived classes
+ * instantiate the channel in Basic or Message mode.
*/
-class ClientChannelTest : public CppUnit::TestCase
+class ClientChannelTestBase : public CppUnit::TestCase
{
- CPPUNIT_TEST_SUITE(ClientChannelTest);
- CPPUNIT_TEST(testPublishGet);
- CPPUNIT_TEST(testGetNoContent);
- CPPUNIT_TEST(testConsumeCancel);
- CPPUNIT_TEST(testConsumePublished);
- CPPUNIT_TEST(testGetFragmentedMessage);
- CPPUNIT_TEST(testConsumeFragmentedMessage);
- CPPUNIT_TEST_SUITE_END();
-
struct Listener: public qpid::client::MessageListener {
vector<Message> messages;
Monitor monitor;
@@ -62,43 +56,48 @@
};
InProcessBrokerClient connection; // client::connection + local broker
- Channel channel;
const std::string qname;
const std::string data;
Queue queue;
Exchange exchange;
Listener listener;
+ protected:
+ boost::scoped_ptr<Channel> channel;
+
public:
- ClientChannelTest()
+ ClientChannelTestBase()
: connection(FRAME_MAX),
qname("testq"), data("hello"),
queue(qname, true), exchange("", Exchange::DIRECT_EXCHANGE)
- {
- connection.openChannel(channel);
- CPPUNIT_ASSERT(channel.getId() != 0);
- channel.declareQueue(queue);
+ {}
+
+ void setUp() {
+ CPPUNIT_ASSERT(channel);
+ connection.openChannel(*channel);
+ CPPUNIT_ASSERT(channel->getId() != 0);
+ channel->declareQueue(queue);
}
void testPublishGet() {
Message pubMsg(data);
pubMsg.getHeaders().setString("hello", "world");
- channel.publish(pubMsg, exchange, qname);
+ channel->publish(pubMsg, exchange, qname);
Message getMsg;
- CPPUNIT_ASSERT(channel.get(getMsg, queue));
+ CPPUNIT_ASSERT(channel->get(getMsg, queue));
CPPUNIT_ASSERT_EQUAL(data, getMsg.getData());
CPPUNIT_ASSERT_EQUAL(string("world"),
getMsg.getHeaders().getString("hello"));
- CPPUNIT_ASSERT(!channel.get(getMsg, queue)); // Empty queue
+ CPPUNIT_ASSERT(!channel->get(getMsg, queue)); // Empty queue
}
void testGetNoContent() {
Message pubMsg;
pubMsg.getHeaders().setString("hello", "world");
- channel.publish(pubMsg, exchange, qname);
+ channel->publish(pubMsg, exchange, qname);
Message getMsg;
- CPPUNIT_ASSERT(channel.get(getMsg, queue));
+ CPPUNIT_ASSERT(channel->get(getMsg, queue));
CPPUNIT_ASSERT(getMsg.getData().empty());
CPPUNIT_ASSERT_EQUAL(string("world"),
getMsg.getHeaders().getString("hello"));
@@ -106,10 +105,10 @@
void testConsumeCancel() {
string tag; // Broker assigned
- channel.consume(queue, tag, &listener);
- channel.start();
+ channel->consume(queue, tag, &listener);
+ channel->start();
CPPUNIT_ASSERT_EQUAL(size_t(0), listener.messages.size());
- channel.publish(Message("a"), exchange, qname);
+ channel->publish(Message("a"), exchange, qname);
{
Mutex::ScopedLock l(listener.monitor);
Time deadline(now() + 1*TIME_SEC);
@@ -120,8 +119,8 @@
CPPUNIT_ASSERT_EQUAL(size_t(1), listener.messages.size());
CPPUNIT_ASSERT_EQUAL(string("a"), listener.messages[0].getData());
- channel.publish(Message("b"), exchange, qname);
- channel.publish(Message("c"), exchange, qname);
+ channel->publish(Message("b"), exchange, qname);
+ channel->publish(Message("c"), exchange, qname);
{
Mutex::ScopedLock l(listener.monitor);
while (listener.messages.size() != 3) {
@@ -132,15 +131,15 @@
CPPUNIT_ASSERT_EQUAL(string("b"), listener.messages[1].getData());
CPPUNIT_ASSERT_EQUAL(string("c"), listener.messages[2].getData());
- channel.cancel(tag);
- channel.publish(Message("d"), exchange, qname);
+ channel->cancel(tag);
+ channel->publish(Message("d"), exchange, qname);
CPPUNIT_ASSERT_EQUAL(size_t(3), listener.messages.size());
{
Mutex::ScopedLock l(listener.monitor);
CPPUNIT_ASSERT(!listener.monitor.wait(TIME_SEC/2));
}
Message msg;
- CPPUNIT_ASSERT(channel.get(msg, queue));
+ CPPUNIT_ASSERT(channel->get(msg, queue));
CPPUNIT_ASSERT_EQUAL(string("d"), msg.getData());
}
@@ -148,11 +147,11 @@
void testConsumePublished() {
Message pubMsg("x");
pubMsg.getHeaders().setString("y", "z");
- channel.publish(pubMsg, exchange, qname);
+ channel->publish(pubMsg, exchange, qname);
string tag;
- channel.consume(queue, tag, &listener);
+ channel->consume(queue, tag, &listener);
CPPUNIT_ASSERT_EQUAL(size_t(0), listener.messages.size());
- channel.start();
+ channel->start();
{
Mutex::ScopedLock l(listener.monitor);
while (listener.messages.size() != 1)
@@ -165,19 +164,19 @@
void testGetFragmentedMessage() {
string longStr(FRAME_MAX*2, 'x'); // Longer than max frame size.
- channel.publish(Message(longStr), exchange, qname);
+ channel->publish(Message(longStr), exchange, qname);
Message getMsg;
- CPPUNIT_ASSERT(channel.get(getMsg, queue));
+ CPPUNIT_ASSERT(channel->get(getMsg, queue));
}
void testConsumeFragmentedMessage() {
string xx(FRAME_MAX*2, 'x');
- channel.publish(Message(xx), exchange, qname);
- channel.start();
+ channel->publish(Message(xx), exchange, qname);
+ channel->start();
string tag;
- channel.consume(queue, tag, &listener);
+ channel->consume(queue, tag, &listener);
string yy(FRAME_MAX*2, 'y');
- channel.publish(Message(yy), exchange, qname);
+ channel->publish(Message(yy), exchange, qname);
{
Mutex::ScopedLock l(listener.monitor);
while (listener.messages.size() != 2)
@@ -188,6 +187,23 @@
}
};
+class BasicClientChannelTest : public ClientChannelTestBase {
+ CPPUNIT_TEST_SUITE(BasicClientChannelTest);
+ CPPUNIT_TEST(testPublishGet);
+ CPPUNIT_TEST(testGetNoContent);
+ CPPUNIT_TEST(testConsumeCancel);
+ CPPUNIT_TEST(testConsumePublished);
+ CPPUNIT_TEST(testGetFragmentedMessage);
+ CPPUNIT_TEST(testConsumeFragmentedMessage);
+ CPPUNIT_TEST_SUITE_END();
+
+ public:
+ BasicClientChannelTest(){
+ channel.reset(new Channel(false, 500, Channel::AMQP_08));
+ }
+};
+
+
// Make this test suite a plugin.
CPPUNIT_PLUGIN_IMPLEMENT();
-CPPUNIT_TEST_SUITE_REGISTRATION(ClientChannelTest);
+CPPUNIT_TEST_SUITE_REGISTRATION(BasicClientChannelTest);