You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2009/11/11 14:15:46 UTC

svn commit: r834869 [2/2] - in /qpid/trunk/qpid/cpp: examples/messaging/ include/qpid/messaging/ src/ src/qpid/client/amqp0_10/ src/qpid/messaging/ src/tests/

Modified: qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp?rev=834869&r1=834868&r2=834869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp Wed Nov 11 13:15:44 2009
@@ -21,6 +21,7 @@
 #include "unit_test.h"
 #include "test_tools.h"
 #include "BrokerFixture.h"
+#include "qpid/messaging/Address.h"
 #include "qpid/messaging/Connection.h"
 #include "qpid/messaging/ListContent.h"
 #include "qpid/messaging/ListView.h"
@@ -33,7 +34,9 @@
 #include "qpid/messaging/Session.h"
 #include "qpid/client/Connection.h"
 #include "qpid/client/Session.h"
+#include "qpid/framing/ExchangeQueryResult.h"
 #include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/Uuid.h"
 #include "qpid/sys/Time.h"
 #include <boost/assign.hpp>
 #include <boost/format.hpp>
@@ -48,6 +51,7 @@
 using namespace qpid::messaging;
 using namespace qpid;
 using qpid::broker::Broker;
+using qpid::framing::Uuid;
 
 struct BrokerAdmin
 {
@@ -80,6 +84,18 @@
         session.exchangeDelete(qpid::client::arg::exchange=name);
     }
 
+    bool checkQueueExists(const std::string& name)
+    {
+        return session.queueQuery(name).getQueue() == name;
+    }
+
+    bool checkExchangeExists(const std::string& name, std::string& type)
+    {
+        qpid::framing::ExchangeQueryResult result = session.exchangeQuery(name);
+        type = result.getType();
+        return !result.getNotFound();
+    }
+
     ~BrokerAdmin()
     {
         session.close();
@@ -99,6 +115,19 @@
         session(connection.newSession()),
         admin(broker->getPort(Broker::TCP_TRANSPORT)) {}
 
+    void ping(const qpid::messaging::Address& address)
+    {
+        Receiver r = session.createReceiver(address);
+        Sender s = session.createSender(address);
+        Message out(Uuid(true).str());
+        s.send(out);
+        Message in;
+        BOOST_CHECK(r.fetch(in, 5*qpid::sys::TIME_SEC));
+        BOOST_CHECK_EQUAL(out.getContent(), in.getContent());
+        r.cancel();
+        s.cancel();
+    }
+
     ~MessagingFixture()
     {
         session.close();
@@ -178,6 +207,22 @@
     return data;
 }
 
+
+void send(Sender& sender, uint count = 1, uint start = 1, const std::string& base = "Message")
+{
+    for (uint i = start; i < start + count; ++i) {
+        sender.send(Message((boost::format("%1%_%2%") % base % i).str()));
+    }
+}
+
+void receive(Receiver& receiver, uint count = 1, uint start = 1,
+             const std::string& base = "Message", qpid::sys::Duration timeout=qpid::sys::TIME_SEC*5)
+{
+    for (uint i = start; i < start + count; ++i) {
+        BOOST_CHECK_EQUAL(receiver.fetch(timeout).getContent(), (boost::format("%1%_%2%") % base % i).str());
+    }
+}
+
 QPID_AUTO_TEST_CASE(testSimpleSendReceive)
 {
     QueueFixture fix;
@@ -212,15 +257,19 @@
 QPID_AUTO_TEST_CASE(testSenderError)
 {
     MessagingFixture fix;
-    //TODO: this is the wrong type for the exception; define explicit set in messaging namespace
-    BOOST_CHECK_THROW(fix.session.createSender("NonExistentAddress"), qpid::framing::NotFoundException);
+    ScopedSuppressLogging sl;
+    BOOST_CHECK_THROW(fix.session.createSender("NonExistentAddress"), qpid::messaging::InvalidAddress);
+    BOOST_CHECK_THROW(fix.session.createSender("NonExistentAddress {create:receiver, type:queue}"),
+                      qpid::messaging::InvalidAddress);
 }
 
 QPID_AUTO_TEST_CASE(testReceiverError)
 {
     MessagingFixture fix;
-    //TODO: this is the wrong type for the exception; define explicit set in messaging namespace
-    BOOST_CHECK_THROW(fix.session.createReceiver("NonExistentAddress"), qpid::framing::NotFoundException);
+    ScopedSuppressLogging sl;
+    BOOST_CHECK_THROW(fix.session.createReceiver("NonExistentAddress"), qpid::messaging::InvalidAddress);
+    BOOST_CHECK_THROW(fix.session.createReceiver("NonExistentAddress {create:sender, type:queue}"),
+                      qpid::messaging::InvalidAddress);
 }
 
 QPID_AUTO_TEST_CASE(testSimpleTopic)
@@ -433,9 +482,7 @@
 {
     QueueFixture fix;
     Sender sender = fix.session.createSender(fix.queue);
-    for (uint i = 0; i < 10; ++i) {
-        sender.send(Message((boost::format("Message_%1%") % (i+1)).str()));
-    }
+    send(sender, 10);
     //Note: this test relies on 'inside knowledge' of the sender
     //implementation and the fact that the simple test case makes it
     //possible to predict when completion information will be sent to
@@ -445,12 +492,248 @@
     BOOST_CHECK_EQUAL(sender.pending(), 0u);
 
     Receiver receiver = fix.session.createReceiver(fix.queue);
-    for (uint i = 0; i < 10; ++i) {
-        BOOST_CHECK_EQUAL(receiver.fetch().getContent(), (boost::format("Message_%1%") % (i+1)).str());
-    }
+    receive(receiver, 10);
     fix.session.acknowledge();
 }
 
+QPID_AUTO_TEST_CASE(testBrowse)
+{
+    QueueFixture fix;
+    Sender sender = fix.session.createSender(fix.queue);
+    send(sender, 10);
+    Receiver browser1 = fix.session.createReceiver(fix.queue + " {browse:true}");
+    receive(browser1, 10);
+    Receiver browser2 = fix.session.createReceiver(fix.queue + " {browse:true}");
+    receive(browser2, 10);
+    Receiver consumer = fix.session.createReceiver(fix.queue);
+    receive(consumer, 10);
+    fix.session.acknowledge();
+}
+
+struct QueueCreatePolicyFixture : public MessagingFixture
+{
+    qpid::messaging::Address address;
+
+    QueueCreatePolicyFixture(const std::string& a) : address(a) {}
+
+    void test()
+    {
+        ping(address);
+        BOOST_CHECK(admin.checkQueueExists(address.getName()));
+    }
+
+    ~QueueCreatePolicyFixture()
+    {
+        admin.deleteQueue(address.getName());    
+    }
+};
+
+QPID_AUTO_TEST_CASE(testCreatePolicyQueueAlways)
+{
+    QueueCreatePolicyFixture fix("# {create:always, type:queue}");
+    fix.test();
+}
+
+QPID_AUTO_TEST_CASE(testCreatePolicyQueueReceiver)
+{
+    QueueCreatePolicyFixture fix("# {create:receiver, type:queue}");
+    Receiver r = fix.session.createReceiver(fix.address);
+    fix.test();
+    r.cancel();
+}
+
+QPID_AUTO_TEST_CASE(testCreatePolicyQueueSender)
+{
+    QueueCreatePolicyFixture fix("# {create:sender, type:queue}");
+    Sender s = fix.session.createSender(fix.address);
+    fix.test();
+    s.cancel();
+}
+
+struct ExchangeCreatePolicyFixture : public MessagingFixture
+{
+    qpid::messaging::Address address;
+    const std::string exchangeType;
+
+    ExchangeCreatePolicyFixture(const std::string& a, const std::string& t) :
+        address(a), exchangeType(t) {}
+
+    void test()
+    {
+        ping(address);
+        std::string actualType;
+        BOOST_CHECK(admin.checkExchangeExists(address.getName(), actualType));
+        BOOST_CHECK_EQUAL(exchangeType, actualType);
+    }
+
+    ~ExchangeCreatePolicyFixture()
+    {
+        admin.deleteExchange(address.getName());    
+    }
+};
+
+QPID_AUTO_TEST_CASE(testCreatePolicyTopic)
+{
+    ExchangeCreatePolicyFixture fix("# {create:always, type:topic, node-properties:{x-amqp0-10-exchange-type:topic}}",
+                                  "topic");
+    fix.test();
+}
+
+QPID_AUTO_TEST_CASE(testCreatePolicyTopicReceiverFanout)
+{
+    ExchangeCreatePolicyFixture fix("#/my-subject {create:receiver, type:topic, node-properties:{x-amqp0-10-exchange-type:fanout}}", "fanout");
+    Receiver r = fix.session.createReceiver(fix.address);
+    fix.test();
+    r.cancel();
+}
+
+QPID_AUTO_TEST_CASE(testCreatePolicyTopicSenderDirect)
+{
+    ExchangeCreatePolicyFixture fix("#/my-subject {create:sender, type:topic, node-properties:{x-amqp0-10-exchange-type:direct}}", "direct");
+    Sender s = fix.session.createSender(fix.address);
+    fix.test();
+    s.cancel();
+}
+
+struct DeletePolicyFixture : public MessagingFixture
+{
+    enum Mode {RECEIVER, SENDER, ALWAYS, NEVER};
+
+    std::string getPolicy(Mode mode)
+    {
+        switch (mode) {
+          case SENDER:
+            return "{delete:sender}";
+          case RECEIVER:
+            return "{delete:receiver}";
+          case ALWAYS:
+            return "{delete:always}";
+          case NEVER:
+            return "{delete:never}";
+        }
+    }
+
+    void testAll()
+    {
+        test(RECEIVER);
+        test(SENDER);
+        test(ALWAYS);
+        test(NEVER);
+    }
+
+    virtual ~DeletePolicyFixture() {}
+    virtual void create(const qpid::messaging::Address&) = 0;
+    virtual void destroy(const qpid::messaging::Address&) = 0;
+    virtual bool exists(const qpid::messaging::Address&) = 0;
+
+    void test(Mode mode)
+    {
+        qpid::messaging::Address address("# " + getPolicy(mode));
+        create(address);
+
+        Sender s = session.createSender(address);
+        Receiver r = session.createReceiver(address);
+        switch (mode) {
+          case RECEIVER:
+            s.cancel();
+            BOOST_CHECK(exists(address));
+            r.cancel();
+            BOOST_CHECK(!exists(address));
+            break;
+          case SENDER:
+            r.cancel();
+            BOOST_CHECK(exists(address));
+            s.cancel();
+            BOOST_CHECK(!exists(address));
+            break;
+          case ALWAYS:
+            //Problematic case at present; multiple attempts to delete
+            //will result in all but one attempt failing and killing
+            //the session which is not desirable. TODO: better
+            //implementation of delete policy.
+            s.cancel();
+            BOOST_CHECK(!exists(address));
+            break;
+          case NEVER:
+            r.cancel();
+            BOOST_CHECK(exists(address));
+            s.cancel();
+            BOOST_CHECK(exists(address));
+            destroy(address);
+        }
+    }
+};
+
+struct QueueDeletePolicyFixture : DeletePolicyFixture
+{
+    void create(const qpid::messaging::Address& address)
+    {
+        admin.createQueue(address.getName());
+    }
+    void destroy(const qpid::messaging::Address& address)
+    {
+        admin.deleteQueue(address.getName());
+    }
+    bool exists(const qpid::messaging::Address& address)
+    {
+        return admin.checkQueueExists(address.getName());
+    }
+};
+
+struct ExchangeDeletePolicyFixture : DeletePolicyFixture
+{
+    const std::string exchangeType;
+    ExchangeDeletePolicyFixture(const std::string type = "topic") : exchangeType(type) {}
+
+    void create(const qpid::messaging::Address& address)
+    {
+        admin.createExchange(address.getName(), exchangeType);
+    }
+    void destroy(const qpid::messaging::Address& address)
+    {
+        admin.deleteExchange(address.getName());
+    }
+    bool exists(const qpid::messaging::Address& address)
+    {
+        std::string actualType;
+        return admin.checkExchangeExists(address.getName(), actualType) && actualType == exchangeType;
+    }
+};
+
+QPID_AUTO_TEST_CASE(testDeletePolicyQueue)
+{
+    QueueDeletePolicyFixture fix;
+    fix.testAll();
+}
+
+QPID_AUTO_TEST_CASE(testDeletePolicyExchange)
+{
+    ExchangeDeletePolicyFixture fix;
+    fix.testAll();
+}
+
+QPID_AUTO_TEST_CASE(testAssertPolicyQueue)
+{
+    MessagingFixture fix;
+    std::string a1 = "q {create:always, assert:always, type:queue, node-properties:{durable:false, x-amqp0-10-arguments:{qpid.max-count:100}}}";
+    Sender s1 = fix.session.createSender(a1);
+    s1.cancel();
+    Receiver r1 = fix.session.createReceiver(a1);
+    r1.cancel();
+    
+    std::string a2 = "q {assert:receiver, node-properties:{durable:true, x-amqp0-10-arguments:{qpid.max-count:100}}}";
+    Sender s2 = fix.session.createSender(a2);
+    s2.cancel();
+    BOOST_CHECK_THROW(fix.session.createReceiver(a2), qpid::messaging::InvalidAddress);
+
+    std::string a3 = "q {assert:sender, node-properties:{x-amqp0-10-arguments:{qpid.max-count:99}}}";
+    BOOST_CHECK_THROW(fix.session.createSender(a3), qpid::messaging::InvalidAddress);
+    Receiver r3 = fix.session.createReceiver(a3);
+    r3.cancel();
+
+    fix.admin.deleteQueue("q");
+}
+
 QPID_AUTO_TEST_SUITE_END()
 
 }} // namespace qpid::tests



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org