You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2009/06/03 18:08:33 UTC

svn commit: r781454 - in /qpid/trunk/qpid/cpp/src: qpid/broker/QueuePolicy.cpp qpid/cluster/Connection.cpp tests/cluster_test.cpp

Author: gsim
Date: Wed Jun  3 16:08:33 2009
New Revision: 781454

URL: http://svn.apache.org/viewvc?rev=781454&view=rev
Log:
Ensure that ring queue behaves as expected when replicated to newly joined cluster node.
Altered queueDurabilityPropagationToNewbie test to not use in-process broker to fix error caused by linking change.


Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp?rev=781454&r1=781453&r2=781454&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp Wed Jun  3 16:08:33 2009
@@ -187,11 +187,17 @@
 RingQueuePolicy::RingQueuePolicy(uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) : 
     QueuePolicy(_maxCount, _maxSize, _type), strict(_type == RING_STRICT) {}
 
+bool before(const QueuedMessage& a, const QueuedMessage& b)
+{
+    return a.position < b.position;
+}
+
 void RingQueuePolicy::enqueued(const QueuedMessage& m)
 {
     QueuePolicy::enqueued(m);
     qpid::sys::Mutex::ScopedLock l(lock);
-    queue.push_back(m);
+    //need to insert in correct location based on position
+    queue.insert(lower_bound(queue.begin(), queue.end(), m, before), m);
 }
 
 void RingQueuePolicy::dequeued(const QueuedMessage& m)

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=781454&r1=781453&r2=781454&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Wed Jun  3 16:08:33 2009
@@ -332,6 +332,7 @@
     if (!ended) {               // Has a message
         if (acquired) {         // Message is on the update queue
             m = getUpdateMessage();
+            m.queue = queue.get();
             queue->enqueued(m); //inform queue of the message 
         } else {                // Message at original position in original queue
             m = queue->find(position);

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=781454&r1=781453&r2=781454&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Wed Jun  3 16:08:33 2009
@@ -704,7 +704,7 @@
     */
     ClusterFixture::Args args;
     prepareArgs(args, durableFlag);
-    ClusterFixture cluster(1, args);
+    ClusterFixture cluster(1, args, -1);
     Client c0(cluster[0]);
     c0.session.queueDeclare("durable_queue",     arg::durable=true);
     c0.session.queueDeclare("non_durable_queue", arg::durable=false);
@@ -712,6 +712,8 @@
     Client c1(cluster[1]);
     QueueQueryResult durable_query     = c1.session.queueQuery ( "durable_queue" );
     QueueQueryResult non_durable_query = c1.session.queueQuery ( "non_durable_queue" );
+    BOOST_CHECK_EQUAL(durable_query.getQueue(), std::string("durable_queue"));
+    BOOST_CHECK_EQUAL(non_durable_query.getQueue(), std::string("non_durable_queue"));
     
     BOOST_CHECK_EQUAL ( durable_query.getDurable(),     true  );
     BOOST_CHECK_EQUAL ( non_durable_query.getDurable(), false );
@@ -854,4 +856,79 @@
     BOOST_CHECK_THROW(c2.session.queueDeclare(arg::queue="q", arg::passive=true), framing::NotFoundException);
 }
 
+QPID_AUTO_TEST_CASE(testRingQueueUpdate) {
+    ScopedSuppressLogging allQuiet;
+    //tests that ring queues are accurately replicated on newly
+    //joined nodes
+    ClusterFixture::Args args;
+    args += "--log-enable", "critical";
+    prepareArgs(args, durableFlag);
+    ClusterFixture cluster(1, args, -1);
+    Client c1(cluster[0], "c1");
+    QueueOptions options;
+    options.setSizePolicy(RING, 0, 5);
+    c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag);
+    for (int i = 0; i < 5; i++) {
+        c1.session.messageTransfer(arg::content=makeMessage((boost::format("%1%_%2%") % "m" % (i+1)).str(), "q", durableFlag));
+    }
+    //receive but don't ack a message
+    LocalQueue lq;
+    SubscriptionSettings lqSettings(FlowControl::messageCredit(1));
+    lqSettings.autoAck = 0;
+    Subscription lqSub = c1.subs.subscribe(lq, "q", lqSettings);
+    c1.session.messageFlush("q");
+
+    //add new node
+    cluster.add();
+
+    //send one more message
+    c1.session.messageTransfer(arg::content=makeMessage((boost::format("%1%_%2%") % "m" % 6).str(), "q", durableFlag));
+
+    c1.session.close();
+    c1.connection.close();
+
+    //check state of queue on both nodes
+    vector<string> expected = list_of<string>("m_2")("m_3")("m_4")("m_5")("m_6");
+    Client c3(cluster[0], "c3");
+    BOOST_CHECK_EQUAL(browse(c3, "q", 5), expected);
+    Client c2(cluster[1], "c2");
+    BOOST_CHECK_EQUAL(browse(c2, "q", 5), expected);
+}
+
+QPID_AUTO_TEST_CASE(testRelease) {
+    ScopedSuppressLogging allQuiet;
+    //tests that releasing a messages that was unacked when one node
+    //joined works correctly
+    ClusterFixture::Args args;
+    args += "--log-enable", "critical";
+    prepareArgs(args, durableFlag);
+    ClusterFixture cluster(1, args, -1);
+    Client c1(cluster[0], "c1");
+    c1.session.queueDeclare("q", arg::durable=durableFlag);
+    for (int i = 0; i < 5; i++) {
+        c1.session.messageTransfer(arg::content=makeMessage((boost::format("%1%_%2%") % "m" % (i+1)).str(), "q", durableFlag));
+    }
+    //receive but don't ack a message
+    LocalQueue lq;
+    SubscriptionSettings lqSettings(FlowControl::messageCredit(1));
+    lqSettings.autoAck = 0;
+    Subscription lqSub = c1.subs.subscribe(lq, "q", lqSettings);
+    c1.session.messageFlush("q");
+    Message received;
+    BOOST_CHECK(lq.get(received));
+    BOOST_CHECK_EQUAL(received.getData(), std::string("m_1"));
+
+    //add new node
+    cluster.add();
+
+    lqSub.release(lqSub.getUnaccepted());
+
+    //check state of queue on both nodes
+    vector<string> expected = list_of<string>("m_1")("m_2")("m_3")("m_4")("m_5");
+    Client c3(cluster[0], "c3");
+    BOOST_CHECK_EQUAL(browse(c3, "q", 5), expected);
+    Client c2(cluster[1], "c2");
+    BOOST_CHECK_EQUAL(browse(c2, "q", 5), expected);
+}
+
 QPID_AUTO_TEST_SUITE_END()



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org