You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2009/05/27 16:23:49 UTC
svn commit: r779183 - in /qpid/trunk/qpid/cpp/src: qpid/broker/
qpid/cluster/ tests/
Author: gsim
Date: Wed May 27 14:23:49 2009
New Revision: 779183
URL: http://svn.apache.org/viewvc?rev=779183&view=rev
Log:
QPID-1488: Ensure policy state (+ store state & mgmt stats) are accurate on newly joined nodes by informing
the queue of any logically enqueued messages that are currently acquired (but not accepted or
released).
QPID-1873: Ensure that the various properties of a queue (durability, exclusivity etc) are correctly replicated
to new cluster members.
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h
qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h
qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=779183&r1=779182&r2=779183&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Wed May 27 14:23:49 2009
@@ -1006,3 +1006,16 @@
insertSeqNo = !seqNoKey.empty();
QPID_LOG(debug, "Inserting sequence numbers as " << key);
}
+
+void Queue::enqueued(const QueuedMessage& m)
+{
+ if (m.payload) {
+ if (policy.get()) policy->tryEnqueue(m);
+ mgntEnqStats(m.payload);
+ if (m.payload->isPersistent()) {
+ enqueue ( 0, m.payload );
+ }
+ } else {
+ QPID_LOG(warning, "Queue informed of enqueued message that has no payload");
+ }
+}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=779183&r1=779182&r2=779183&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Wed May 27 14:23:49 2009
@@ -124,7 +124,7 @@
QueuedMessage getFront();
QueuedMessage& checkLvqReplace(QueuedMessage& msg);
void clearLVQIndex(const QueuedMessage& msg);
-
+
inline void mgntEnqStats(const boost::intrusive_ptr<Message>& msg)
{
if (mgmtObject != 0) {
@@ -251,6 +251,14 @@
void dequeueCommitted(const QueuedMessage& msg);
/**
+ * Inform queue of messages that were enqueued, have since
+ * been acquired but not yet accepted or released (and
+ * thus are still logically on the queue) - used in
+ * clustered broker.
+ */
+ void enqueued(const QueuedMessage& msg);
+
+ /**
* Gets the next available message
*/
QPID_BROKER_EXTERN QueuedMessage get();
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=779183&r1=779182&r2=779183&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Wed May 27 14:23:49 2009
@@ -284,7 +284,6 @@
exclusiveQueues.erase(exclusiveQueues.begin());
}
}
-
bool SessionAdapter::QueueHandlerImpl::isLocal(const ConnectionToken* t) const
{
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h?rev=779183&r1=779182&r2=779183&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h Wed May 27 14:23:49 2009
@@ -28,6 +28,7 @@
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/StructHelper.h"
+#include <algorithm>
#include <vector>
#include <boost/function.hpp>
#include <boost/shared_ptr.hpp>
@@ -68,6 +69,12 @@
FileHandler* getFileHandler() { throw framing::NotImplementedException("Class not implemented"); }
StreamHandler* getStreamHandler() { throw framing::NotImplementedException("Class not implemented"); }
+ template <class F> void eachExclusiveQueue(F f)
+ {
+ queueImpl.eachExclusiveQueue(f);
+ }
+
+
private:
//common base for utility methods etc that are specific to this adapter
struct HandlerHelper : public HandlerImpl
@@ -130,6 +137,10 @@
bool isLocal(const ConnectionToken* t) const;
void destroyExclusiveQueues();
+ template <class F> void eachExclusiveQueue(F f)
+ {
+ std::for_each(exclusiveQueues.begin(), exclusiveQueues.end(), f);
+ }
};
class MessageHandlerImpl :
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=779183&r1=779182&r2=779183&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Wed May 27 14:23:49 2009
@@ -107,6 +107,7 @@
// Used by cluster to create replica sessions.
SemanticState& getSemanticState() { return semanticState; }
boost::intrusive_ptr<Message> getMessageInProgress() { return msgBuilder.getMessage(); }
+ SessionAdapter& getSessionAdapter() { return adapter; }
bool processSendCredit(uint32_t msgs);
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=779183&r1=779182&r2=779183&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Wed May 27 14:23:49 2009
@@ -330,10 +330,12 @@
broker::QueuedMessage m;
broker::Queue::shared_ptr queue = findQueue(qname);
if (!ended) { // Has a message
- if (acquired) // Message is on the update queue
+ if (acquired) { // Message is on the update queue
m = getUpdateMessage();
- else // Message at original position in original queue
+ queue->enqueued(m); //inform queue of the message
+ } else { // Message at original position in original queue
m = queue->find(position);
+ }
if (!m.payload)
throw Exception(QPID_MSG("deliveryRecord no update message"));
}
@@ -344,11 +346,6 @@
if (completed) dr.complete();
if (ended) dr.setEnded(); // Exsitance of message
semanticState().record(dr); // Part of the session's unacked list.
-
- // If the message was unacked, the newbie broker must place
- // it in its messageStore.
- if ( m.payload && m.payload->isPersistent() && acquired && !ended)
- queue->enqueue ( 0, m.payload );
}
void Connection::queuePosition(const string& qname, const SequenceNumber& position) {
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=779183&r1=779182&r2=779183&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Wed May 27 14:23:49 2009
@@ -124,7 +124,7 @@
QPID_LOG(debug, updaterId << " updating state to " << updateeId << " at " << updateeUrl);
Broker& b = updaterBroker;
b.getExchanges().eachExchange(boost::bind(&UpdateClient::updateExchange, this, _1));
- b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueue, this, _1));
+ b.getQueues().eachQueue(boost::bind(&UpdateClient::updateNonExclusiveQueue, this, _1));
// Update queue is used to transfer acquired messages that are no longer on their original queue.
session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true);
session.sync();
@@ -225,18 +225,35 @@
}
};
-void UpdateClient::updateQueue(const boost::shared_ptr<Queue>& q) {
- QPID_LOG(debug, updaterId << " updating queue " << q->getName());
- ClusterConnectionProxy proxy(session);
- proxy.queue(encode(*q));
- MessageUpdater updater(q->getName(), session, expiry);
+void UpdateClient::updateQueue(client::AsyncSession& s, const boost::shared_ptr<Queue>& q) {
+ broker::Exchange::shared_ptr alternateExchange = q->getAlternateExchange();
+ s.queueDeclare(
+ arg::queue = q->getName(),
+ arg::durable = q->isDurable(),
+ arg::autoDelete = q->isAutoDelete(),
+ arg::alternateExchange = alternateExchange ? alternateExchange->getName() : "",
+ arg::arguments = q->getSettings(),
+ arg::exclusive = q->hasExclusiveOwner()
+ );
+ MessageUpdater updater(q->getName(), s, expiry);
q->eachMessage(boost::bind(&MessageUpdater::updateQueuedMessage, &updater, _1));
- q->eachBinding(boost::bind(&UpdateClient::updateBinding, this, q->getName(), _1));
+ q->eachBinding(boost::bind(&UpdateClient::updateBinding, this, s, q->getName(), _1));
}
+void UpdateClient::updateExclusiveQueue(const boost::shared_ptr<broker::Queue>& q) {
+ QPID_LOG(debug, updaterId << " updating exclusive queue " << q->getName() << " on " << shadowSession.getId());
+ updateQueue(shadowSession, q);
+}
-void UpdateClient::updateBinding(const std::string& queue, const QueueBinding& binding) {
- session.exchangeBind(queue, binding.exchange, binding.key, binding.args);
+void UpdateClient::updateNonExclusiveQueue(const boost::shared_ptr<broker::Queue>& q) {
+ if (!q->hasExclusiveOwner()) {
+ QPID_LOG(debug, updaterId << " updating queue " << q->getName());
+ updateQueue(session, q);
+ }//else queue will be updated as part of session state of owning session
+}
+
+void UpdateClient::updateBinding(client::AsyncSession& s, const std::string& queue, const QueueBinding& binding) {
+ s.exchangeBind(queue, binding.exchange, binding.key, binding.args);
}
void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& updateConnection) {
@@ -274,6 +291,9 @@
// Re-create session state on remote connection.
+ QPID_LOG(debug, updaterId << " updating exclusive queues.");
+ ss->getSessionAdapter().eachExclusiveQueue(boost::bind(&UpdateClient::updateExclusiveQueue, this, _1));
+
// Update consumers. For reasons unknown, boost::bind does not work here with boost 1.33.
QPID_LOG(debug, updaterId << " updating consumers.");
ss->getSemanticState().eachConsumer(std::bind1st(std::mem_fun(&UpdateClient::updateConsumer),this));
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h?rev=779183&r1=779182&r2=779183&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h Wed May 27 14:23:49 2009
@@ -81,11 +81,13 @@
void updateUnacked(const broker::DeliveryRecord&);
private:
- void updateQueue(const boost::shared_ptr<broker::Queue>&);
+ void updateQueue(client::AsyncSession&, const boost::shared_ptr<broker::Queue>&);
+ void updateNonExclusiveQueue(const boost::shared_ptr<broker::Queue>&);
+ void updateExclusiveQueue(const boost::shared_ptr<broker::Queue>&);
void updateExchange(const boost::shared_ptr<broker::Exchange>&);
void updateMessage(const broker::QueuedMessage&);
void updateMessageTo(const broker::QueuedMessage&, const std::string& queue, client::Session s);
- void updateBinding(const std::string& queue, const broker::QueueBinding& binding);
+ void updateBinding(client::AsyncSession&, const std::string& queue, const broker::QueueBinding& binding);
void updateConnection(const boost::intrusive_ptr<Connection>& connection);
void updateSession(broker::SessionHandler& s);
void updateTxState(broker::SemanticState& s);
Modified: qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=779183&r1=779182&r2=779183&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Wed May 27 14:23:49 2009
@@ -28,6 +28,7 @@
#include "qpid/client/Session.h"
#include "qpid/client/FailoverListener.h"
#include "qpid/client/FailoverManager.h"
+#include "qpid/client/QueueOptions.h"
#include "qpid/cluster/Cluster.h"
#include "qpid/cluster/Cpg.h"
#include "qpid/cluster/UpdateClient.h"
@@ -770,4 +771,55 @@
fmgr.close();
}
+QPID_AUTO_TEST_CASE(testPolicyUpdate) {
+ ScopedSuppressLogging allQuiet;
+ //tests that the policys internal state is accurate on newly
+ //joined nodes
+ ClusterFixture::Args args;
+ args += "--log-enable", "critical";
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
+ Client c1(cluster[0], "c1");
+ QueueOptions options;
+ options.setSizePolicy(REJECT, 0, 2);
+ c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag);
+ c1.session.messageTransfer(arg::content=Message("one", "q"));
+ cluster.add();
+ Client c2(cluster[1], "c2");
+ c2.session.messageTransfer(arg::content=Message("two", "q"));
+
+ BOOST_CHECK_THROW(c2.session.messageTransfer(arg::content=Message("three", "q")), framing::ResourceLimitExceededException);
+
+ Message received;
+ BOOST_CHECK(c1.subs.get(received, "q"));
+ BOOST_CHECK_EQUAL(received.getData(), std::string("one"));
+ BOOST_CHECK(c1.subs.get(received, "q"));
+ BOOST_CHECK_EQUAL(received.getData(), std::string("two"));
+ BOOST_CHECK(!c1.subs.get(received, "q"));
+}
+
+QPID_AUTO_TEST_CASE(testExclusiveQueueUpdate) {
+ ScopedSuppressLogging allQuiet;
+ //tests that exclusive queues are accurately replicated on newly
+ //joined nodes
+ ClusterFixture::Args args;
+ args += "--log-enable", "critical";
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
+ Client c1(cluster[0], "c1");
+ c1.session.queueDeclare("q", arg::exclusive=true, arg::autoDelete=true, arg::alternateExchange="amq.fanout");
+ cluster.add();
+ Client c2(cluster[1], "c2");
+ QueueQueryResult result = c2.session.queueQuery("q");
+ BOOST_CHECK_EQUAL(result.getQueue(), std::string("q"));
+ BOOST_CHECK(result.getExclusive());
+ BOOST_CHECK(result.getAutoDelete());
+ BOOST_CHECK(!result.getDurable());
+ BOOST_CHECK_EQUAL(result.getAlternateExchange(), std::string("amq.fanout"));
+ BOOST_CHECK_THROW(c2.session.queueDeclare(arg::queue="q", arg::exclusive=true, arg::passive=true), framing::ResourceLockedException);
+ c1.connection.close();
+ c2.session = c2.connection.newSession();
+ BOOST_CHECK_THROW(c2.session.queueDeclare(arg::queue="q", arg::passive=true), framing::NotFoundException);
+}
+
QPID_AUTO_TEST_SUITE_END()
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org