You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2009/11/11 14:15:46 UTC
svn commit: r834869 [2/2] - in /qpid/trunk/qpid/cpp: examples/messaging/
include/qpid/messaging/ src/ src/qpid/client/amqp0_10/ src/qpid/messaging/
src/tests/
Modified: qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp?rev=834869&r1=834868&r2=834869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp Wed Nov 11 13:15:44 2009
@@ -21,6 +21,7 @@
#include "unit_test.h"
#include "test_tools.h"
#include "BrokerFixture.h"
+#include "qpid/messaging/Address.h"
#include "qpid/messaging/Connection.h"
#include "qpid/messaging/ListContent.h"
#include "qpid/messaging/ListView.h"
@@ -33,7 +34,9 @@
#include "qpid/messaging/Session.h"
#include "qpid/client/Connection.h"
#include "qpid/client/Session.h"
+#include "qpid/framing/ExchangeQueryResult.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/Uuid.h"
#include "qpid/sys/Time.h"
#include <boost/assign.hpp>
#include <boost/format.hpp>
@@ -48,6 +51,7 @@
using namespace qpid::messaging;
using namespace qpid;
using qpid::broker::Broker;
+using qpid::framing::Uuid;
struct BrokerAdmin
{
@@ -80,6 +84,18 @@
session.exchangeDelete(qpid::client::arg::exchange=name);
}
+ bool checkQueueExists(const std::string& name)
+ {
+ return session.queueQuery(name).getQueue() == name;
+ }
+
+ bool checkExchangeExists(const std::string& name, std::string& type)
+ {
+ qpid::framing::ExchangeQueryResult result = session.exchangeQuery(name);
+ type = result.getType();
+ return !result.getNotFound();
+ }
+
~BrokerAdmin()
{
session.close();
@@ -99,6 +115,19 @@
session(connection.newSession()),
admin(broker->getPort(Broker::TCP_TRANSPORT)) {}
+ void ping(const qpid::messaging::Address& address)
+ {
+ Receiver r = session.createReceiver(address);
+ Sender s = session.createSender(address);
+ Message out(Uuid(true).str());
+ s.send(out);
+ Message in;
+ BOOST_CHECK(r.fetch(in, 5*qpid::sys::TIME_SEC));
+ BOOST_CHECK_EQUAL(out.getContent(), in.getContent());
+ r.cancel();
+ s.cancel();
+ }
+
~MessagingFixture()
{
session.close();
@@ -178,6 +207,22 @@
return data;
}
+
+void send(Sender& sender, uint count = 1, uint start = 1, const std::string& base = "Message")
+{
+ for (uint i = start; i < start + count; ++i) {
+ sender.send(Message((boost::format("%1%_%2%") % base % i).str()));
+ }
+}
+
+void receive(Receiver& receiver, uint count = 1, uint start = 1,
+ const std::string& base = "Message", qpid::sys::Duration timeout=qpid::sys::TIME_SEC*5)
+{
+ for (uint i = start; i < start + count; ++i) {
+ BOOST_CHECK_EQUAL(receiver.fetch(timeout).getContent(), (boost::format("%1%_%2%") % base % i).str());
+ }
+}
+
QPID_AUTO_TEST_CASE(testSimpleSendReceive)
{
QueueFixture fix;
@@ -212,15 +257,19 @@
QPID_AUTO_TEST_CASE(testSenderError)
{
MessagingFixture fix;
- //TODO: this is the wrong type for the exception; define explicit set in messaging namespace
- BOOST_CHECK_THROW(fix.session.createSender("NonExistentAddress"), qpid::framing::NotFoundException);
+ ScopedSuppressLogging sl;
+ BOOST_CHECK_THROW(fix.session.createSender("NonExistentAddress"), qpid::messaging::InvalidAddress);
+ BOOST_CHECK_THROW(fix.session.createSender("NonExistentAddress {create:receiver, type:queue}"),
+ qpid::messaging::InvalidAddress);
}
QPID_AUTO_TEST_CASE(testReceiverError)
{
MessagingFixture fix;
- //TODO: this is the wrong type for the exception; define explicit set in messaging namespace
- BOOST_CHECK_THROW(fix.session.createReceiver("NonExistentAddress"), qpid::framing::NotFoundException);
+ ScopedSuppressLogging sl;
+ BOOST_CHECK_THROW(fix.session.createReceiver("NonExistentAddress"), qpid::messaging::InvalidAddress);
+ BOOST_CHECK_THROW(fix.session.createReceiver("NonExistentAddress {create:sender, type:queue}"),
+ qpid::messaging::InvalidAddress);
}
QPID_AUTO_TEST_CASE(testSimpleTopic)
@@ -433,9 +482,7 @@
{
QueueFixture fix;
Sender sender = fix.session.createSender(fix.queue);
- for (uint i = 0; i < 10; ++i) {
- sender.send(Message((boost::format("Message_%1%") % (i+1)).str()));
- }
+ send(sender, 10);
//Note: this test relies on 'inside knowledge' of the sender
//implementation and the fact that the simple test case makes it
//possible to predict when completion information will be sent to
@@ -445,12 +492,248 @@
BOOST_CHECK_EQUAL(sender.pending(), 0u);
Receiver receiver = fix.session.createReceiver(fix.queue);
- for (uint i = 0; i < 10; ++i) {
- BOOST_CHECK_EQUAL(receiver.fetch().getContent(), (boost::format("Message_%1%") % (i+1)).str());
- }
+ receive(receiver, 10);
fix.session.acknowledge();
}
+QPID_AUTO_TEST_CASE(testBrowse)
+{
+ QueueFixture fix;
+ Sender sender = fix.session.createSender(fix.queue);
+ send(sender, 10);
+ Receiver browser1 = fix.session.createReceiver(fix.queue + " {browse:true}");
+ receive(browser1, 10);
+ Receiver browser2 = fix.session.createReceiver(fix.queue + " {browse:true}");
+ receive(browser2, 10);
+ Receiver consumer = fix.session.createReceiver(fix.queue);
+ receive(consumer, 10);
+ fix.session.acknowledge();
+}
+
+struct QueueCreatePolicyFixture : public MessagingFixture
+{
+ qpid::messaging::Address address;
+
+ QueueCreatePolicyFixture(const std::string& a) : address(a) {}
+
+ void test()
+ {
+ ping(address);
+ BOOST_CHECK(admin.checkQueueExists(address.getName()));
+ }
+
+ ~QueueCreatePolicyFixture()
+ {
+ admin.deleteQueue(address.getName());
+ }
+};
+
+QPID_AUTO_TEST_CASE(testCreatePolicyQueueAlways)
+{
+ QueueCreatePolicyFixture fix("# {create:always, type:queue}");
+ fix.test();
+}
+
+QPID_AUTO_TEST_CASE(testCreatePolicyQueueReceiver)
+{
+ QueueCreatePolicyFixture fix("# {create:receiver, type:queue}");
+ Receiver r = fix.session.createReceiver(fix.address);
+ fix.test();
+ r.cancel();
+}
+
+QPID_AUTO_TEST_CASE(testCreatePolicyQueueSender)
+{
+ QueueCreatePolicyFixture fix("# {create:sender, type:queue}");
+ Sender s = fix.session.createSender(fix.address);
+ fix.test();
+ s.cancel();
+}
+
+struct ExchangeCreatePolicyFixture : public MessagingFixture
+{
+ qpid::messaging::Address address;
+ const std::string exchangeType;
+
+ ExchangeCreatePolicyFixture(const std::string& a, const std::string& t) :
+ address(a), exchangeType(t) {}
+
+ void test()
+ {
+ ping(address);
+ std::string actualType;
+ BOOST_CHECK(admin.checkExchangeExists(address.getName(), actualType));
+ BOOST_CHECK_EQUAL(exchangeType, actualType);
+ }
+
+ ~ExchangeCreatePolicyFixture()
+ {
+ admin.deleteExchange(address.getName());
+ }
+};
+
+QPID_AUTO_TEST_CASE(testCreatePolicyTopic)
+{
+ ExchangeCreatePolicyFixture fix("# {create:always, type:topic, node-properties:{x-amqp0-10-exchange-type:topic}}",
+ "topic");
+ fix.test();
+}
+
+QPID_AUTO_TEST_CASE(testCreatePolicyTopicReceiverFanout)
+{
+ ExchangeCreatePolicyFixture fix("#/my-subject {create:receiver, type:topic, node-properties:{x-amqp0-10-exchange-type:fanout}}", "fanout");
+ Receiver r = fix.session.createReceiver(fix.address);
+ fix.test();
+ r.cancel();
+}
+
+QPID_AUTO_TEST_CASE(testCreatePolicyTopicSenderDirect)
+{
+ ExchangeCreatePolicyFixture fix("#/my-subject {create:sender, type:topic, node-properties:{x-amqp0-10-exchange-type:direct}}", "direct");
+ Sender s = fix.session.createSender(fix.address);
+ fix.test();
+ s.cancel();
+}
+
+struct DeletePolicyFixture : public MessagingFixture
+{
+ enum Mode {RECEIVER, SENDER, ALWAYS, NEVER};
+
+ std::string getPolicy(Mode mode)
+ {
+ switch (mode) {
+ case SENDER:
+ return "{delete:sender}";
+ case RECEIVER:
+ return "{delete:receiver}";
+ case ALWAYS:
+ return "{delete:always}";
+ case NEVER:
+ return "{delete:never}";
+ }
+ }
+
+ void testAll()
+ {
+ test(RECEIVER);
+ test(SENDER);
+ test(ALWAYS);
+ test(NEVER);
+ }
+
+ virtual ~DeletePolicyFixture() {}
+ virtual void create(const qpid::messaging::Address&) = 0;
+ virtual void destroy(const qpid::messaging::Address&) = 0;
+ virtual bool exists(const qpid::messaging::Address&) = 0;
+
+ void test(Mode mode)
+ {
+ qpid::messaging::Address address("# " + getPolicy(mode));
+ create(address);
+
+ Sender s = session.createSender(address);
+ Receiver r = session.createReceiver(address);
+ switch (mode) {
+ case RECEIVER:
+ s.cancel();
+ BOOST_CHECK(exists(address));
+ r.cancel();
+ BOOST_CHECK(!exists(address));
+ break;
+ case SENDER:
+ r.cancel();
+ BOOST_CHECK(exists(address));
+ s.cancel();
+ BOOST_CHECK(!exists(address));
+ break;
+ case ALWAYS:
+ //Problematic case at present; multiple attempts to delete
+ //will result in all but one attempt failing and killing
+ //the session which is not desirable. TODO: better
+ //implementation of delete policy.
+ s.cancel();
+ BOOST_CHECK(!exists(address));
+ break;
+ case NEVER:
+ r.cancel();
+ BOOST_CHECK(exists(address));
+ s.cancel();
+ BOOST_CHECK(exists(address));
+ destroy(address);
+ }
+ }
+};
+
+struct QueueDeletePolicyFixture : DeletePolicyFixture
+{
+ void create(const qpid::messaging::Address& address)
+ {
+ admin.createQueue(address.getName());
+ }
+ void destroy(const qpid::messaging::Address& address)
+ {
+ admin.deleteQueue(address.getName());
+ }
+ bool exists(const qpid::messaging::Address& address)
+ {
+ return admin.checkQueueExists(address.getName());
+ }
+};
+
+struct ExchangeDeletePolicyFixture : DeletePolicyFixture
+{
+ const std::string exchangeType;
+ ExchangeDeletePolicyFixture(const std::string type = "topic") : exchangeType(type) {}
+
+ void create(const qpid::messaging::Address& address)
+ {
+ admin.createExchange(address.getName(), exchangeType);
+ }
+ void destroy(const qpid::messaging::Address& address)
+ {
+ admin.deleteExchange(address.getName());
+ }
+ bool exists(const qpid::messaging::Address& address)
+ {
+ std::string actualType;
+ return admin.checkExchangeExists(address.getName(), actualType) && actualType == exchangeType;
+ }
+};
+
+QPID_AUTO_TEST_CASE(testDeletePolicyQueue)
+{
+ QueueDeletePolicyFixture fix;
+ fix.testAll();
+}
+
+QPID_AUTO_TEST_CASE(testDeletePolicyExchange)
+{
+ ExchangeDeletePolicyFixture fix;
+ fix.testAll();
+}
+
+QPID_AUTO_TEST_CASE(testAssertPolicyQueue)
+{
+ MessagingFixture fix;
+ std::string a1 = "q {create:always, assert:always, type:queue, node-properties:{durable:false, x-amqp0-10-arguments:{qpid.max-count:100}}}";
+ Sender s1 = fix.session.createSender(a1);
+ s1.cancel();
+ Receiver r1 = fix.session.createReceiver(a1);
+ r1.cancel();
+
+ std::string a2 = "q {assert:receiver, node-properties:{durable:true, x-amqp0-10-arguments:{qpid.max-count:100}}}";
+ Sender s2 = fix.session.createSender(a2);
+ s2.cancel();
+ BOOST_CHECK_THROW(fix.session.createReceiver(a2), qpid::messaging::InvalidAddress);
+
+ std::string a3 = "q {assert:sender, node-properties:{x-amqp0-10-arguments:{qpid.max-count:99}}}";
+ BOOST_CHECK_THROW(fix.session.createSender(a3), qpid::messaging::InvalidAddress);
+ Receiver r3 = fix.session.createReceiver(a3);
+ r3.cancel();
+
+ fix.admin.deleteQueue("q");
+}
+
QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org