You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2009/05/27 16:23:49 UTC

svn commit: r779183 - in /qpid/trunk/qpid/cpp/src: qpid/broker/ qpid/cluster/ tests/

Author: gsim
Date: Wed May 27 14:23:49 2009
New Revision: 779183

URL: http://svn.apache.org/viewvc?rev=779183&view=rev
Log:
QPID-1488: Ensure policy state (+ store state & mgmt stats) are accurate on newly joined nodes by informing 
           the queue of any logically enqueued messages that are currently acquired (but not accepted or
           released).

QPID-1873: Ensure that the various properties of a queue (durability, exclusivity etc) are correctly replicated
           to new cluster members.


Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h
    qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=779183&r1=779182&r2=779183&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Wed May 27 14:23:49 2009
@@ -1006,3 +1006,16 @@
     insertSeqNo = !seqNoKey.empty();
     QPID_LOG(debug, "Inserting sequence numbers as " << key);
 }
+
+void Queue::enqueued(const QueuedMessage& m)
+{
+    if (m.payload) {
+        if (policy.get()) policy->tryEnqueue(m);
+        mgntEnqStats(m.payload);
+        if (m.payload->isPersistent()) {
+            enqueue ( 0, m.payload );
+        }
+    } else {
+        QPID_LOG(warning, "Queue informed of enqueued message that has no payload");
+    }
+}

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=779183&r1=779182&r2=779183&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Wed May 27 14:23:49 2009
@@ -124,7 +124,7 @@
             QueuedMessage getFront();
             QueuedMessage& checkLvqReplace(QueuedMessage& msg);
             void clearLVQIndex(const QueuedMessage& msg);
-            
+
             inline void mgntEnqStats(const boost::intrusive_ptr<Message>& msg)
             {
                 if (mgmtObject != 0) {
@@ -251,6 +251,14 @@
             void dequeueCommitted(const QueuedMessage& msg);
 
             /**
+             * Inform queue of messages that were enqueued, have since
+             * been acquired but not yet accepted or released (and
+             * thus are still logically on the queue) - used in
+             * clustered broker.  
+             */ 
+            void enqueued(const QueuedMessage& msg);
+            
+            /**
              * Gets the next available message 
              */
             QPID_BROKER_EXTERN QueuedMessage get();

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=779183&r1=779182&r2=779183&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Wed May 27 14:23:49 2009
@@ -284,7 +284,6 @@
         exclusiveQueues.erase(exclusiveQueues.begin());
     }
 }
-
     
 bool SessionAdapter::QueueHandlerImpl::isLocal(const ConnectionToken* t) const 
 { 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h?rev=779183&r1=779182&r2=779183&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h Wed May 27 14:23:49 2009
@@ -28,6 +28,7 @@
 #include "qpid/framing/reply_exceptions.h"
 #include "qpid/framing/StructHelper.h"
 
+#include <algorithm>
 #include <vector>
 #include <boost/function.hpp>
 #include <boost/shared_ptr.hpp>
@@ -68,6 +69,12 @@
     FileHandler* getFileHandler() { throw framing::NotImplementedException("Class not implemented"); }
     StreamHandler* getStreamHandler() { throw framing::NotImplementedException("Class not implemented"); }
 
+    template <class F> void eachExclusiveQueue(F f) 
+    { 
+        queueImpl.eachExclusiveQueue(f);
+    }
+
+
   private:
     //common base for utility methods etc that are specific to this adapter
     struct HandlerHelper : public HandlerImpl 
@@ -130,6 +137,10 @@
         bool isLocal(const ConnectionToken* t) const; 
 
         void destroyExclusiveQueues();
+        template <class F> void eachExclusiveQueue(F f) 
+        { 
+            std::for_each(exclusiveQueues.begin(), exclusiveQueues.end(), f);
+        }
     };
 
     class MessageHandlerImpl :

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=779183&r1=779182&r2=779183&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Wed May 27 14:23:49 2009
@@ -107,6 +107,7 @@
     // Used by cluster to create replica sessions.
     SemanticState& getSemanticState() { return semanticState; }
     boost::intrusive_ptr<Message> getMessageInProgress() { return msgBuilder.getMessage(); }
+    SessionAdapter& getSessionAdapter() { return adapter; }
 
     bool processSendCredit(uint32_t msgs);
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=779183&r1=779182&r2=779183&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Wed May 27 14:23:49 2009
@@ -330,10 +330,12 @@
     broker::QueuedMessage m;
     broker::Queue::shared_ptr queue = findQueue(qname);
     if (!ended) {               // Has a message
-        if (acquired)           // Message is on the update queue
+        if (acquired) {         // Message is on the update queue
             m = getUpdateMessage();
-        else                    // Message at original position in original queue
+            queue->enqueued(m); //inform queue of the message 
+        } else {                // Message at original position in original queue
             m = queue->find(position);
+        }
         if (!m.payload)
             throw Exception(QPID_MSG("deliveryRecord no update message"));
     }
@@ -344,11 +346,6 @@
     if (completed) dr.complete();
     if (ended) dr.setEnded();   // Exsitance of message
     semanticState().record(dr); // Part of the session's unacked list.
-
-    // If the message was unacked, the newbie broker must place
-    // it in its messageStore.
-    if ( m.payload && m.payload->isPersistent() && acquired && !ended)
-        queue->enqueue ( 0, m.payload );
 }
 
 void Connection::queuePosition(const string& qname, const SequenceNumber& position) {

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=779183&r1=779182&r2=779183&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Wed May 27 14:23:49 2009
@@ -124,7 +124,7 @@
     QPID_LOG(debug, updaterId << " updating state to " << updateeId << " at " << updateeUrl);
     Broker& b = updaterBroker;
     b.getExchanges().eachExchange(boost::bind(&UpdateClient::updateExchange, this, _1));
-    b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueue, this, _1));
+    b.getQueues().eachQueue(boost::bind(&UpdateClient::updateNonExclusiveQueue, this, _1));
     // Update queue is used to transfer acquired messages that are no longer on their original queue.
     session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true);
     session.sync();
@@ -225,18 +225,35 @@
     }
 };
 
-void UpdateClient::updateQueue(const boost::shared_ptr<Queue>& q) {
-    QPID_LOG(debug, updaterId << " updating queue " << q->getName());
-    ClusterConnectionProxy proxy(session);
-    proxy.queue(encode(*q));
-    MessageUpdater updater(q->getName(), session, expiry);
+void UpdateClient::updateQueue(client::AsyncSession& s, const boost::shared_ptr<Queue>& q) {
+    broker::Exchange::shared_ptr alternateExchange = q->getAlternateExchange();
+    s.queueDeclare(
+        arg::queue = q->getName(),
+        arg::durable = q->isDurable(),
+        arg::autoDelete = q->isAutoDelete(),
+        arg::alternateExchange = alternateExchange ? alternateExchange->getName() : "",
+        arg::arguments = q->getSettings(),
+        arg::exclusive = q->hasExclusiveOwner()
+    );
+    MessageUpdater updater(q->getName(), s, expiry);
     q->eachMessage(boost::bind(&MessageUpdater::updateQueuedMessage, &updater, _1));
-    q->eachBinding(boost::bind(&UpdateClient::updateBinding, this, q->getName(), _1));
+    q->eachBinding(boost::bind(&UpdateClient::updateBinding, this, s, q->getName(), _1));
 }
 
+void UpdateClient::updateExclusiveQueue(const boost::shared_ptr<broker::Queue>& q) {
+    QPID_LOG(debug, updaterId << " updating exclusive queue " << q->getName() << " on " << shadowSession.getId());
+    updateQueue(shadowSession, q);
+}
 
-void UpdateClient::updateBinding(const std::string& queue, const QueueBinding& binding) {
-    session.exchangeBind(queue, binding.exchange, binding.key, binding.args);
+void UpdateClient::updateNonExclusiveQueue(const boost::shared_ptr<broker::Queue>& q) {
+    if (!q->hasExclusiveOwner()) {
+        QPID_LOG(debug, updaterId << " updating queue " << q->getName());
+        updateQueue(session, q);
+    }//else queue will be updated as part of session state of owning session
+}
+
+void UpdateClient::updateBinding(client::AsyncSession& s, const std::string& queue, const QueueBinding& binding) {
+    s.exchangeBind(queue, binding.exchange, binding.key, binding.args);
 }
 
 void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& updateConnection) {
@@ -274,6 +291,9 @@
 
     // Re-create session state on remote connection.
 
+    QPID_LOG(debug, updaterId << " updating exclusive queues.");
+    ss->getSessionAdapter().eachExclusiveQueue(boost::bind(&UpdateClient::updateExclusiveQueue, this, _1));
+
     // Update consumers. For reasons unknown, boost::bind does not work here with boost 1.33.
     QPID_LOG(debug, updaterId << " updating consumers.");
     ss->getSemanticState().eachConsumer(std::bind1st(std::mem_fun(&UpdateClient::updateConsumer),this));

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h?rev=779183&r1=779182&r2=779183&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h Wed May 27 14:23:49 2009
@@ -81,11 +81,13 @@
     void updateUnacked(const broker::DeliveryRecord&);
 
   private:
-    void updateQueue(const boost::shared_ptr<broker::Queue>&);
+    void updateQueue(client::AsyncSession&, const boost::shared_ptr<broker::Queue>&);
+    void updateNonExclusiveQueue(const boost::shared_ptr<broker::Queue>&);
+    void updateExclusiveQueue(const boost::shared_ptr<broker::Queue>&);
     void updateExchange(const boost::shared_ptr<broker::Exchange>&);
     void updateMessage(const broker::QueuedMessage&);
     void updateMessageTo(const broker::QueuedMessage&, const std::string& queue, client::Session s);
-    void updateBinding(const std::string& queue, const broker::QueueBinding& binding);
+    void updateBinding(client::AsyncSession&, const std::string& queue, const broker::QueueBinding& binding);
     void updateConnection(const boost::intrusive_ptr<Connection>& connection);
     void updateSession(broker::SessionHandler& s);
     void updateTxState(broker::SemanticState& s);

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=779183&r1=779182&r2=779183&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Wed May 27 14:23:49 2009
@@ -28,6 +28,7 @@
 #include "qpid/client/Session.h"
 #include "qpid/client/FailoverListener.h"
 #include "qpid/client/FailoverManager.h"
+#include "qpid/client/QueueOptions.h"
 #include "qpid/cluster/Cluster.h"
 #include "qpid/cluster/Cpg.h"
 #include "qpid/cluster/UpdateClient.h"
@@ -770,4 +771,55 @@
     fmgr.close();
 }
 
+QPID_AUTO_TEST_CASE(testPolicyUpdate) {
+    ScopedSuppressLogging allQuiet;
+    //tests that the policys internal state is accurate on newly
+    //joined nodes
+    ClusterFixture::Args args;
+    args += "--log-enable", "critical";
+    prepareArgs(args, durableFlag);
+    ClusterFixture cluster(1, args, -1);
+    Client c1(cluster[0], "c1");
+    QueueOptions options;
+    options.setSizePolicy(REJECT, 0, 2);
+    c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag);
+    c1.session.messageTransfer(arg::content=Message("one", "q"));
+    cluster.add();
+    Client c2(cluster[1], "c2");
+    c2.session.messageTransfer(arg::content=Message("two", "q"));
+    
+    BOOST_CHECK_THROW(c2.session.messageTransfer(arg::content=Message("three", "q")), framing::ResourceLimitExceededException);
+
+    Message received;
+    BOOST_CHECK(c1.subs.get(received, "q"));
+    BOOST_CHECK_EQUAL(received.getData(), std::string("one"));
+    BOOST_CHECK(c1.subs.get(received, "q"));
+    BOOST_CHECK_EQUAL(received.getData(), std::string("two"));
+    BOOST_CHECK(!c1.subs.get(received, "q"));
+}
+
+QPID_AUTO_TEST_CASE(testExclusiveQueueUpdate) {
+    ScopedSuppressLogging allQuiet;
+    //tests that exclusive queues are accurately replicated on newly
+    //joined nodes
+    ClusterFixture::Args args;
+    args += "--log-enable", "critical";
+    prepareArgs(args, durableFlag);
+    ClusterFixture cluster(1, args, -1);
+    Client c1(cluster[0], "c1");
+    c1.session.queueDeclare("q", arg::exclusive=true, arg::autoDelete=true, arg::alternateExchange="amq.fanout");
+    cluster.add();
+    Client c2(cluster[1], "c2");
+    QueueQueryResult result = c2.session.queueQuery("q");
+    BOOST_CHECK_EQUAL(result.getQueue(), std::string("q"));
+    BOOST_CHECK(result.getExclusive());
+    BOOST_CHECK(result.getAutoDelete());
+    BOOST_CHECK(!result.getDurable());
+    BOOST_CHECK_EQUAL(result.getAlternateExchange(), std::string("amq.fanout"));
+    BOOST_CHECK_THROW(c2.session.queueDeclare(arg::queue="q", arg::exclusive=true, arg::passive=true), framing::ResourceLockedException);
+    c1.connection.close();
+    c2.session = c2.connection.newSession();
+    BOOST_CHECK_THROW(c2.session.queueDeclare(arg::queue="q", arg::passive=true), framing::NotFoundException);
+}
+
 QPID_AUTO_TEST_SUITE_END()



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org