You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cc...@apache.org on 2008/09/30 16:57:33 UTC

svn commit: r700489 - in /incubator/qpid/trunk/qpid/cpp/src: qpid/broker/ qpid/cluster/ tests/

Author: cctrieloff
Date: Tue Sep 30 07:57:32 2008
New Revision: 700489

URL: http://svn.apache.org/viewvc?rev=700489&view=rev
Log:
QPID-1306

This patch includes:
- Optimistic Consume
- Support for forcing Queue durable on cluster failure
- Some cleanup on mgnt functions in Queue to inlines
- Tests

Still coming
- header for client queue options
- LVQ support bits.



Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=700489&r1=700488&r2=700489&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Tue Sep 30 07:57:32 2008
@@ -37,12 +37,18 @@
 
 TransferAdapter Message::TRANSFER;
 
-Message::Message(const SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), loaded(false), staged(false), publisher(0), adapter(0) {}
+Message::Message(const SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), loaded(false),
+staged(false), forcePersistentPolicy(false), publisher(0), adapter(0) {}
 
 Message::~Message()
 {
 }
 
+void Message::forcePersistent()
+{
+    forcePersistentPolicy = true;
+}
+
 std::string Message::getRoutingKey() const
 {
     return getAdapter().getRoutingKey(frames);
@@ -73,7 +79,7 @@
 
 bool Message::isPersistent()
 {
-    return getAdapter().isPersistent(frames);
+    return (getAdapter().isPersistent(frames) || forcePersistentPolicy);
 }
 
 bool Message::requiresAccept()

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=700489&r1=700488&r2=700489&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Tue Sep 30 07:57:32 2008
@@ -133,6 +133,8 @@
 
     bool isExcluded(const std::vector<std::string>& excludes) const;
     void addTraceId(const std::string& id);
+	
+	void forcePersistent();
 
   private:
     mutable sys::Mutex lock;
@@ -142,6 +144,7 @@
     bool redelivered;
     bool loaded;
     bool staged;
+	bool forcePersistentPolicy; // used to force message as durable, via a broker policy
     ConnectionToken* publisher;
     mutable MessageAdapter* adapter;
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=700489&r1=700488&r2=700489&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Tue Sep 30 07:57:32 2008
@@ -63,6 +63,10 @@
     consumerCount(0),
     exclusive(0),
     noLocal(false),
+    lastValueQueue(false),
+    optimisticConsume(false),
+    persistLastNode(false),
+	inLastNodeFailure(false),
     persistenceId(0),
     policyExceeded(false),
     mgmtObject(0)
@@ -134,21 +138,12 @@
     } else {
         // if no store then mark as enqueued
         if (!enqueue(0, msg)){
-            if (mgmtObject != 0) {
-                mgmtObject->inc_msgTotalEnqueues ();
-                mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
-            }
             push(msg);
             msg->enqueueComplete();
         }else {
-            if (mgmtObject != 0) {
-                mgmtObject->inc_msgTotalEnqueues ();
-                mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
-                mgmtObject->inc_msgPersistEnqueues ();
-                mgmtObject->inc_bytePersistEnqueues (msg->contentSize ());
-            }
-            push(msg);
+			push(msg);
         }
+        mgntEnqStats(msg);
         QPID_LOG(debug, "Message " << msg << " enqueued on " << name << "[" << this << "]");
     }
 }
@@ -157,12 +152,7 @@
 void Queue::recover(boost::intrusive_ptr<Message>& msg){
     push(msg);
     msg->enqueueComplete(); // mark the message as enqueued
-    if (mgmtObject != 0) {
-        mgmtObject->inc_msgTotalEnqueues ();
-        mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
-        mgmtObject->inc_msgPersistEnqueues ();
-        mgmtObject->inc_bytePersistEnqueues (msg->contentSize ());
-    }
+    mgntEnqStats(msg);
 
     if (store && !msg->isContentLoaded()) {
         //content has not been loaded, need to ensure that lazy loading mode is set:
@@ -173,16 +163,11 @@
 
 void Queue::process(boost::intrusive_ptr<Message>& msg){
     push(msg);
-    if (mgmtObject != 0) {
-        mgmtObject->inc_msgTotalEnqueues ();
-        mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
+    mgntEnqStats(msg);
+	if (mgmtObject != 0){
         mgmtObject->inc_msgTxnEnqueues ();
         mgmtObject->inc_byteTxnEnqueues (msg->contentSize ());
-        if (msg->isPersistent ()) {
-            mgmtObject->inc_msgPersistEnqueues ();
-            mgmtObject->inc_bytePersistEnqueues (msg->contentSize ());
-        }
-    }
+	}
 }
 
 void Queue::requeue(const QueuedMessage& msg){
@@ -466,20 +451,46 @@
     return autodelete && !consumerCount;
 }
 
+void Queue::clearLastNodeFailure()
+{
+    inLastNodeFailure = false;
+}
+
+void Queue::setLastNodeFailure()
+{
+    if (persistLastNode){
+		Mutex::ScopedLock locker(messageLock);
+    	for ( Messages::const_iterator i = messages.begin(); i != messages.end(); ++i ) {
+			i->payload->forcePersistent();
+			if (i->payload->getPersistenceId() == 0){
+            	enqueue(0, i->payload);
+			}
+    	}
+		inLastNodeFailure = true;
+	}
+}
+
 // return true if store exists, 
 bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg)
 {
-    if (traceId.size()) {
+    if (inLastNodeFailure && persistLastNode){
+	    msg->forcePersistent();
+	}
+	
+	if (traceId.size()) {
         msg->addTraceId(traceId);
     }
 
     if (msg->isPersistent() && store) {
-        msg->enqueueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
-        boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg);
+        if (optimisticConsume){
+		    msg->enqueueComplete(); // (optimistic) allow consume before written to disk
+        } else {
+		    msg->enqueueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
+		}
+		boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg);
         store->enqueue(ctxt, pmsg, *this);
         return true;
     }
-    //msg->enqueueAsync();   // increments intrusive ptr cnt
     return false;
 }
 
@@ -492,12 +503,15 @@
         dequeued(msg);
     }
     if (msg.payload->isPersistent() && store) {
-        msg.payload->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
-        boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg.payload);
+        if (optimisticConsume) {
+		    msg.payload->dequeueComplete();
+        } else {
+            msg.payload->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
+        }
+		boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg.payload);
         store->dequeue(ctxt, pmsg, *this);
         return true;
     }
-    //msg->dequeueAsync();   // decrements intrusive ptr cnt
     return false;
 }
 
@@ -519,14 +533,7 @@
 void Queue::dequeued(const QueuedMessage& msg)
 {
     if (policy.get()) policy->dequeued(msg);
-    if (mgmtObject != 0){
-        mgmtObject->inc_msgTotalDequeues  ();
-        mgmtObject->inc_byteTotalDequeues (msg.payload->contentSize());
-        if (msg.payload->isPersistent ()){
-            mgmtObject->inc_msgPersistDequeues ();
-            mgmtObject->inc_bytePersistDequeues (msg.payload->contentSize());
-        }
-    }
+    mgntDeqStats(msg.payload);
 }
 
 
@@ -537,6 +544,9 @@
     const std::string qpidNoLocal("no-local");
     const std::string qpidTraceIdentity("qpid.trace.id");
     const std::string qpidTraceExclude("qpid.trace.exclude");
+    const std::string qpidLastValueQueue("qpid.last_value_queue");
+    const std::string qpidOptimisticConsume("qpid.optimistic_consume");
+    const std::string qpidPersistLastNode("qpid.persist_last_node");
 }
 
 void Queue::create(const FieldTable& _settings)
@@ -555,6 +565,15 @@
     noLocal = _settings.get(qpidNoLocal);
     QPID_LOG(debug, "Configured queue with no-local=" << noLocal);
 
+    lastValueQueue= _settings.get(qpidLastValueQueue);
+    if (lastValueQueue) QPID_LOG(debug, "Configured queue as Last Value Queue");
+
+    optimisticConsume= _settings.get(qpidOptimisticConsume);
+    if (optimisticConsume) QPID_LOG(debug, "Configured queue with optimistic consume");
+    
+	persistLastNode= _settings.get(qpidPersistLastNode);
+    if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node");
+
     traceId = _settings.getString(qpidTraceIdentity);
     std::string excludeList = _settings.getString(qpidTraceExclude);
     if (excludeList.size()) {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=700489&r1=700488&r2=700489&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Tue Sep 30 07:57:32 2008
@@ -73,6 +73,10 @@
             uint32_t consumerCount;
             OwnershipToken* exclusive;
             bool noLocal;
+            bool lastValueQueue;
+            bool optimisticConsume;
+            bool persistLastNode;
+			bool inLastNodeFailure;
             std::string traceId;
             std::vector<std::string> traceExclude;
             Listeners listeners;
@@ -103,6 +107,26 @@
 
             void dequeued(const QueuedMessage& msg);
             void popAndDequeue();
+			inline void mgntEnqStats(const boost::intrusive_ptr<Message>& msg){
+                if (mgmtObject != 0) {
+                    mgmtObject->inc_msgTotalEnqueues ();
+                    mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
+                    if (msg->isPersistent ()) {
+                	    mgmtObject->inc_msgPersistEnqueues ();
+                	    mgmtObject->inc_bytePersistEnqueues (msg->contentSize ());
+					}
+                }
+			};
+			inline void mgntDeqStats(const boost::intrusive_ptr<Message>& msg){
+    			if (mgmtObject != 0){
+        			mgmtObject->inc_msgTotalDequeues  ();
+        			mgmtObject->inc_byteTotalDequeues (msg->contentSize());
+        			if (msg->isPersistent ()){
+            			mgmtObject->inc_msgPersistDequeues ();
+            			mgmtObject->inc_bytePersistDequeues (msg->contentSize());
+        			}
+    			}
+			};
 
         public:
 
@@ -178,6 +202,11 @@
             bool canAutoDelete() const;
             const QueueBindings& getBindings() const { return bindings; }
 
+            /**
+			* used to take messages from in memory and flush down to disk.
+			*/
+			void setLastNodeFailure();
+			void clearLastNodeFailure();
 
             bool enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg);
             /**

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp?rev=700489&r1=700488&r2=700489&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp Tue Sep 30 07:57:32 2008
@@ -27,7 +27,7 @@
 using namespace qpid::sys;
 
 QueueRegistry::QueueRegistry() :
-    counter(1), store(0), parent(0) {}
+    counter(1), store(0), parent(0), lastNode(false) {}
 
 QueueRegistry::~QueueRegistry(){}
 
@@ -43,6 +43,7 @@
     if (i == queues.end()) {
         Queue::shared_ptr queue(new Queue(name, autoDelete, durable ? store : 0, owner, parent));
         queues[name] = queue;
+		if (lastNode) queue->setLastNodeFailure();
 
         return std::pair<Queue::shared_ptr, bool>(queue, true);
     } else {
@@ -91,3 +92,17 @@
 MessageStore* QueueRegistry::getStore() const {
     return store;
 }
+
+void QueueRegistry::updateQueueClusterState(bool _lastNode)
+{
+    RWlock::ScopedRlock locker(lock);
+    for (QueueMap::iterator i = queues.begin(); i != queues.end(); i++) {
+        if (_lastNode){
+		    i->second->setLastNodeFailure();
+		} else {
+		    i->second->clearLastNodeFailure();
+		}
+    }
+	lastNode = _lastNode;
+}
+

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h?rev=700489&r1=700488&r2=700489&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h Tue Sep 30 07:57:32 2008
@@ -107,6 +107,12 @@
         for (QueueMap::const_iterator i = queues.begin(); i != queues.end(); ++i)
             f(i->second);
     }
+	
+	/**
+	* Change queue mode when cluster size drops to 1 node, expands again
+	* in practice allows flow queue to disk when last name to be exectuted
+	*/
+	void updateQueueClusterState(bool lastNode);
     
 private:
     typedef std::map<string, Queue::shared_ptr> QueueMap;
@@ -115,6 +121,7 @@
     int counter;
     MessageStore* store;
     management::Manageable* parent;
+	bool lastNode; //used to set mode on queue declare
 
     //destroy impl that assumes lock is already held:
     void destroyLH (const string& name);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=700489&r1=700488&r2=700489&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Tue Sep 30 07:57:32 2008
@@ -22,6 +22,7 @@
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/SessionState.h"
 #include "qpid/broker/Connection.h"
+#include "qpid/broker/QueueRegistry.h"
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/framing/ClusterDumpRequestBody.h"
 #include "qpid/framing/ClusterUpdateBody.h"
@@ -71,7 +72,8 @@
     handler(&joiningHandler),
     joiningHandler(*this),
     memberHandler(*this),
-    mcastId()
+    mcastId(),
+	lastSize(1)
 {
     ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
     if (agent != 0){
@@ -332,7 +334,17 @@
 
 void Cluster::updateMemberStats() {
     if (mgmtObject) {
-        mgmtObject->set_clusterSize(size()); 
+        if (lastSize != size() && size() ==1){
+            QPID_LOG(info, "Last node standing, updating queue policies, size:" <<size());
+		    broker.getQueues().updateQueueClusterState(true);
+			lastSize = size();
+		}else if (lastSize != size() && size() > 1) {
+            QPID_LOG(info, "Recover back from last node standing, updating queue policies, size:" <<size());
+		    broker.getQueues().updateQueueClusterState(false);
+			lastSize = size();
+		}
+		
+		mgmtObject->set_clusterSize(size()); 
         std::vector<Url> vectUrl = getUrls();
         string urlstr;
         for(std::vector<Url>::iterator iter = vectUrl.begin(); iter != vectUrl.end(); iter++ ) {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=700489&r1=700488&r2=700489&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Tue Sep 30 07:57:32 2008
@@ -164,6 +164,7 @@
     MemberHandler memberHandler;
 
     uint32_t mcastId;
+	size_t lastSize;
 
   friend class ClusterHandler;
   friend class JoiningHandler;

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp?rev=700489&r1=700488&r2=700489&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp Tue Sep 30 07:57:32 2008
@@ -24,6 +24,7 @@
 #include "qpid/broker/Deliverable.h"
 #include "qpid/broker/ExchangeRegistry.h"
 #include "qpid/broker/QueueRegistry.h"
+#include "qpid/broker/NullMessageStore.h"
 #include "qpid/framing/MessageTransferBody.h"
 #include <iostream>
 #include "boost/format.hpp"
@@ -236,6 +237,95 @@
     exchange3->route(deliverable, key, &args);
 }
 
+QPID_AUTO_TEST_CASE(testPersistLastNodeStanding){
+
+    FieldTable args;
+
+    // set queue mode
+	args.setInt("qpid.persist_last_node", 1);
+	
+	Queue::shared_ptr queue(new Queue("my-queue", true));
+    queue->configure(args);
+	
+    intrusive_ptr<Message> msg1 = message("e", "A");
+    intrusive_ptr<Message> msg2 = message("e", "B");
+    intrusive_ptr<Message> msg3 = message("e", "C");
+
+	//enqueue 2 messages
+    queue->deliver(msg1);
+    queue->deliver(msg2);
+	
+	//change mode
+	queue->setLastNodeFailure();
+	
+	//enqueue 1 message
+    queue->deliver(msg3);
+	
+	//check all have persistent ids.
+    BOOST_CHECK(msg1->isPersistent());
+    BOOST_CHECK(msg2->isPersistent());
+    BOOST_CHECK(msg3->isPersistent());
+
+}
+
+class TestMessageStore : public NullMessageStore
+{
+  public:
+    
+    virtual void dequeue(TransactionContext*,
+                 const boost::intrusive_ptr<PersistableMessage>& /*msg*/,
+                 const PersistableQueue& /*queue*/)
+    {
+    }
+
+    virtual void enqueue(TransactionContext*,
+                 const boost::intrusive_ptr<PersistableMessage>& /*msg*/,
+                 const PersistableQueue& /* queue */)
+    {
+    }
+
+    TestMessageStore() : NullMessageStore(false) {}
+    ~TestMessageStore(){}
+};
+
+
+QPID_AUTO_TEST_CASE(testOptimisticConsume){
+
+    FieldTable args;
+	args.setInt("qpid.persist_last_node", 1);
+
+    // set queue mode
+	
+	TestMessageStore store;
+	Queue::shared_ptr queue(new Queue("my-queue", true, &store));
+	queue->setLastNodeFailure();
+	
+    intrusive_ptr<Message> msg1 = message("e", "A");
+    intrusive_ptr<Message> msg2 = message("e", "B");
+    intrusive_ptr<Message> msg3 = message("e", "C");
+	msg1->forcePersistent();
+	msg2->forcePersistent();
+	msg3->forcePersistent();
+
+	//enqueue 2 messages
+    queue->deliver(msg1);
+    queue->deliver(msg2);
+	
+	//change mode
+	args.setInt("qpid.optimistic_consume", 1);
+    queue->configure(args);
+	
+	//enqueue 1 message
+    queue->deliver(msg3);
+	
+	//check all have persistent ids.
+    BOOST_CHECK(!msg1->isEnqueueComplete());
+    BOOST_CHECK(!msg2->isEnqueueComplete());
+    BOOST_CHECK(msg3->isEnqueueComplete());
+
+}
+
+
 QPID_AUTO_TEST_SUITE_END()