You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2009/06/03 18:08:33 UTC
svn commit: r781454 - in /qpid/trunk/qpid/cpp/src:
qpid/broker/QueuePolicy.cpp qpid/cluster/Connection.cpp
tests/cluster_test.cpp
Author: gsim
Date: Wed Jun 3 16:08:33 2009
New Revision: 781454
URL: http://svn.apache.org/viewvc?rev=781454&view=rev
Log:
Ensure that ring queue behaves as expected when replicated to newly joined cluster node.
Altered queueDurabilityPropagationToNewbie test to not use in-process broker to fix error caused by linking change.
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp?rev=781454&r1=781453&r2=781454&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp Wed Jun 3 16:08:33 2009
@@ -187,11 +187,17 @@
RingQueuePolicy::RingQueuePolicy(uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) :
QueuePolicy(_maxCount, _maxSize, _type), strict(_type == RING_STRICT) {}
+bool before(const QueuedMessage& a, const QueuedMessage& b)
+{
+ return a.position < b.position;
+}
+
void RingQueuePolicy::enqueued(const QueuedMessage& m)
{
QueuePolicy::enqueued(m);
qpid::sys::Mutex::ScopedLock l(lock);
- queue.push_back(m);
+ //need to insert in correct location based on position
+ queue.insert(lower_bound(queue.begin(), queue.end(), m, before), m);
}
void RingQueuePolicy::dequeued(const QueuedMessage& m)
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=781454&r1=781453&r2=781454&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Wed Jun 3 16:08:33 2009
@@ -332,6 +332,7 @@
if (!ended) { // Has a message
if (acquired) { // Message is on the update queue
m = getUpdateMessage();
+ m.queue = queue.get();
queue->enqueued(m); //inform queue of the message
} else { // Message at original position in original queue
m = queue->find(position);
Modified: qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=781454&r1=781453&r2=781454&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Wed Jun 3 16:08:33 2009
@@ -704,7 +704,7 @@
*/
ClusterFixture::Args args;
prepareArgs(args, durableFlag);
- ClusterFixture cluster(1, args);
+ ClusterFixture cluster(1, args, -1);
Client c0(cluster[0]);
c0.session.queueDeclare("durable_queue", arg::durable=true);
c0.session.queueDeclare("non_durable_queue", arg::durable=false);
@@ -712,6 +712,8 @@
Client c1(cluster[1]);
QueueQueryResult durable_query = c1.session.queueQuery ( "durable_queue" );
QueueQueryResult non_durable_query = c1.session.queueQuery ( "non_durable_queue" );
+ BOOST_CHECK_EQUAL(durable_query.getQueue(), std::string("durable_queue"));
+ BOOST_CHECK_EQUAL(non_durable_query.getQueue(), std::string("non_durable_queue"));
BOOST_CHECK_EQUAL ( durable_query.getDurable(), true );
BOOST_CHECK_EQUAL ( non_durable_query.getDurable(), false );
@@ -854,4 +856,79 @@
BOOST_CHECK_THROW(c2.session.queueDeclare(arg::queue="q", arg::passive=true), framing::NotFoundException);
}
+QPID_AUTO_TEST_CASE(testRingQueueUpdate) {
+ ScopedSuppressLogging allQuiet;
+ //tests that ring queues are accurately replicated on newly
+ //joined nodes
+ ClusterFixture::Args args;
+ args += "--log-enable", "critical";
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
+ Client c1(cluster[0], "c1");
+ QueueOptions options;
+ options.setSizePolicy(RING, 0, 5);
+ c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag);
+ for (int i = 0; i < 5; i++) {
+ c1.session.messageTransfer(arg::content=makeMessage((boost::format("%1%_%2%") % "m" % (i+1)).str(), "q", durableFlag));
+ }
+ //receive but don't ack a message
+ LocalQueue lq;
+ SubscriptionSettings lqSettings(FlowControl::messageCredit(1));
+ lqSettings.autoAck = 0;
+ Subscription lqSub = c1.subs.subscribe(lq, "q", lqSettings);
+ c1.session.messageFlush("q");
+
+ //add new node
+ cluster.add();
+
+ //send one more message
+ c1.session.messageTransfer(arg::content=makeMessage((boost::format("%1%_%2%") % "m" % 6).str(), "q", durableFlag));
+
+ c1.session.close();
+ c1.connection.close();
+
+ //check state of queue on both nodes
+ vector<string> expected = list_of<string>("m_2")("m_3")("m_4")("m_5")("m_6");
+ Client c3(cluster[0], "c3");
+ BOOST_CHECK_EQUAL(browse(c3, "q", 5), expected);
+ Client c2(cluster[1], "c2");
+ BOOST_CHECK_EQUAL(browse(c2, "q", 5), expected);
+}
+
+QPID_AUTO_TEST_CASE(testRelease) {
+ ScopedSuppressLogging allQuiet;
+ //tests that releasing a messages that was unacked when one node
+ //joined works correctly
+ ClusterFixture::Args args;
+ args += "--log-enable", "critical";
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
+ Client c1(cluster[0], "c1");
+ c1.session.queueDeclare("q", arg::durable=durableFlag);
+ for (int i = 0; i < 5; i++) {
+ c1.session.messageTransfer(arg::content=makeMessage((boost::format("%1%_%2%") % "m" % (i+1)).str(), "q", durableFlag));
+ }
+ //receive but don't ack a message
+ LocalQueue lq;
+ SubscriptionSettings lqSettings(FlowControl::messageCredit(1));
+ lqSettings.autoAck = 0;
+ Subscription lqSub = c1.subs.subscribe(lq, "q", lqSettings);
+ c1.session.messageFlush("q");
+ Message received;
+ BOOST_CHECK(lq.get(received));
+ BOOST_CHECK_EQUAL(received.getData(), std::string("m_1"));
+
+ //add new node
+ cluster.add();
+
+ lqSub.release(lqSub.getUnaccepted());
+
+ //check state of queue on both nodes
+ vector<string> expected = list_of<string>("m_1")("m_2")("m_3")("m_4")("m_5");
+ Client c3(cluster[0], "c3");
+ BOOST_CHECK_EQUAL(browse(c3, "q", 5), expected);
+ Client c2(cluster[1], "c2");
+ BOOST_CHECK_EQUAL(browse(c2, "q", 5), expected);
+}
+
QPID_AUTO_TEST_SUITE_END()
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org