You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cc...@apache.org on 2008/10/08 20:35:46 UTC

svn commit: r702958 - in /incubator/qpid/trunk/qpid/cpp/src: qpid/broker/Queue.cpp qpid/broker/Queue.h qpid/client/QueueOptions.cpp qpid/client/QueueOptions.h tests/QueueOptionsTest.cpp tests/QueueTest.cpp

Author: cctrieloff
Date: Wed Oct  8 11:35:46 2008
New Revision: 702958

URL: http://svn.apache.org/viewvc?rev=702958&view=rev
Log:
QPID-1306

- added lvq support
- added lvq tests
- added safety test for lvq
- updated QueueOptions for lvq
- some refactor to queue, to have signel pop loction


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/QueueOptions.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/QueueOptions.h
    incubator/qpid/trunk/qpid/cpp/src/tests/QueueOptionsTest.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=702958&r1=702957&r2=702958&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Wed Oct  8 11:35:46 2008
@@ -51,6 +51,21 @@
 using std::mem_fun;
 namespace _qmf = qmf::org::apache::qpid::broker;
 
+
+namespace 
+{
+    const std::string qpidMaxSize("qpid.max_size");
+    const std::string qpidMaxCount("qpid.max_count");
+    const std::string qpidNoLocal("no-local");
+    const std::string qpidTraceIdentity("qpid.trace.id");
+    const std::string qpidTraceExclude("qpid.trace.exclude");
+    const std::string qpidLastValueQueue("qpid.last_value_queue");
+    const std::string qpidOptimisticConsume("qpid.optimistic_consume");
+    const std::string qpidPersistLastNode("qpid.persist_last_node");
+    const std::string qpidVQMatchProperty("qpid.LVQ_key");
+}
+
+
 Queue::Queue(const string& _name, bool _autodelete, 
              MessageStore* const _store,
              const OwnershipToken* const _owner,
@@ -253,7 +268,7 @@
             if (c->filter(msg.payload)) {
                 if (c->accept(msg.payload)) {            
                     m = msg;
-                    messages.pop_front();
+                    popMsg(msg);
                     return true;
                 } else {
                     //message(s) are available but consumer hasn't got enough credit
@@ -371,7 +386,7 @@
 
     if(!messages.empty()){
         msg = messages.front();
-        messages.pop_front();
+        popMsg(msg);
     }
     return msg;
 }
@@ -406,22 +421,49 @@
         QueuedMessage qmsg = messages.front();
         boost::intrusive_ptr<Message> msg = qmsg.payload;
         destq->deliver(msg); // deliver message to the destination queue
-        messages.pop_front();
+        popMsg(qmsg);
         dequeue(0, qmsg);
         count++;
     }
     return count;
 }
 
+void Queue::popMsg(QueuedMessage& qmsg)
+{
+    if (lastValueQueue){
+    	const framing::FieldTable* ft = qmsg.payload->getApplicationHeaders();
+		string key = ft->getString(qpidVQMatchProperty);
+		lvq.erase(key);
+	}
+    messages.pop_front();
+}
+
 void Queue::push(boost::intrusive_ptr<Message>& msg){
     Listeners copy;
     {
         Mutex::ScopedLock locker(messageLock);   
         QueuedMessage qm(this, msg, ++sequence);
         if (policy.get()) policy->tryEnqueue(qm);
-
-        messages.push_back(qm);
-        listeners.swap(copy);
+         
+        //if (lastValueQueue && LVQinsert(qm) ) return; // LVQ update of existing message
+		LVQ::iterator i;
+		if (lastValueQueue){
+    		const framing::FieldTable* ft = msg->getApplicationHeaders();
+			string key = ft->getString(qpidVQMatchProperty);
+
+    		i = lvq.find(key);
+			if (i == lvq.end()){
+                messages.push_back(qm);
+                listeners.swap(copy);
+	    		lvq[key] = &messages.back();
+			}else {
+			    i->second->payload = msg;
+			}		 
+		}else {
+		
+            messages.push_back(qm);
+            listeners.swap(copy);
+		}
     }
     for_each(copy.begin(), copy.end(), boost::mem_fn(&Consumer::notify));
 }
@@ -514,8 +556,8 @@
 void Queue::popAndDequeue()
 {
     QueuedMessage msg = messages.front();
-    messages.pop_front();
-    dequeue(0, msg);
+    popMsg(msg);
+	dequeue(0, msg);
 }
 
 /**
@@ -529,18 +571,6 @@
 }
 
 
-namespace 
-{
-    const std::string qpidMaxSize("qpid.max_size");
-    const std::string qpidMaxCount("qpid.max_count");
-    const std::string qpidNoLocal("no-local");
-    const std::string qpidTraceIdentity("qpid.trace.id");
-    const std::string qpidTraceExclude("qpid.trace.exclude");
-    const std::string qpidLastValueQueue("qpid.last_value_queue");
-    const std::string qpidOptimisticConsume("qpid.optimistic_consume");
-    const std::string qpidPersistLastNode("qpid.persist_last_node");
-}
-
 void Queue::create(const FieldTable& _settings)
 {
     settings = _settings;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=702958&r1=702957&r2=702958&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Wed Oct  8 11:35:46 2008
@@ -65,6 +65,7 @@
 
             typedef std::list<Consumer::shared_ptr> Listeners;
             typedef std::deque<QueuedMessage> Messages;
+			typedef std::map<string,QueuedMessage*> LVQ;
 
             const string name;
             const bool autodelete;
@@ -81,6 +82,7 @@
             std::vector<std::string> traceExclude;
             Listeners listeners;
             Messages messages;
+			LVQ lvq;
             mutable qpid::sys::Mutex consumerLock;
             mutable qpid::sys::Mutex messageLock;
             mutable qpid::sys::Mutex ownershipLock;
@@ -253,6 +255,9 @@
             }
 
             bool releaseMessageContent(const QueuedMessage&);
+
+            void popMsg(QueuedMessage& qmsg);
+
         };
     }
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/QueueOptions.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/QueueOptions.cpp?rev=702958&r1=702957&r2=702958&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/QueueOptions.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/QueueOptions.cpp Wed Oct  8 11:35:46 2008
@@ -38,6 +38,7 @@
 const std::string QueueOptions::strLastValueQueue("qpid.last_value_queue");
 const std::string QueueOptions::strOptimisticConsume("qpid.optimistic_consume");
 const std::string QueueOptions::strPersistLastNode("qpid.persist_last_node");
+const std::string QueueOptions::strLVQMatchProperty("qpid.LVQ_key");
 
 
 QueueOptions::~QueueOptions()
@@ -83,15 +84,17 @@
 void QueueOptions::setOrdering(QueueOrderingPolicy op)
 {
 	if (op == LVQ){
-	     // TODO, add and test options with LVQ patch.
-		 // also set the key match for LVQ
-	     //setString(LastValueQueue, 1); 
-	
+	    setInt(strLastValueQueue, 1); 
 	}else{
 	    clearOrdering();
 	}
 }
 
+void QueueOptions::getLVQKey(std::string& key)
+{
+    key.assign(strLVQMatchProperty);
+}
+
 void QueueOptions::clearSizePolicy()
 {
     erase(strMaxCountKey);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/QueueOptions.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/QueueOptions.h?rev=702958&r1=702957&r2=702958&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/QueueOptions.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/QueueOptions.h Wed Oct  8 11:35:46 2008
@@ -86,6 +86,11 @@
 	void clearPersistLastNode();
 	
 	/**
+	* get the key used match LVQ in args for message transfer 
+	*/
+	void getLVQKey(std::string& key);
+		
+	/**
 	* Use default odering policy
 	*/ 
 	void clearOrdering();
@@ -100,7 +105,7 @@
 	static const std::string strLastValueQueue;
 	static const std::string strOptimisticConsume;
 	static const std::string strPersistLastNode;
-	private:
+	static const std::string strLVQMatchProperty;
 	
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/QueueOptionsTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/QueueOptionsTest.cpp?rev=702958&r1=702957&r2=702958&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/QueueOptionsTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/QueueOptionsTest.cpp Wed Oct  8 11:35:46 2008
@@ -63,15 +63,19 @@
 	
 	ft.setOptimisticConsume();
 	ft.setPersistLastNode();
+    ft.setOrdering(LVQ);
 	
     BOOST_CHECK(1 == ft.getInt(QueueOptions::strOptimisticConsume));
     BOOST_CHECK(1 == ft.getInt(QueueOptions::strPersistLastNode));
+    BOOST_CHECK(1 == ft.getInt(QueueOptions::strLastValueQueue));
 	
 	ft.clearOptimisticConsume();
 	ft.clearPersistLastNode();
+    ft.setOrdering(FIFO);
 
 	BOOST_CHECK(!ft.isSet(QueueOptions::strOptimisticConsume));
 	BOOST_CHECK(!ft.isSet(QueueOptions::strPersistLastNode));
+	BOOST_CHECK(!ft.isSet(QueueOptions::strLastValueQueue));
 
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp?rev=702958&r1=702957&r2=702958&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp Wed Oct  8 11:35:46 2008
@@ -240,10 +240,8 @@
 
 QPID_AUTO_TEST_CASE(testPersistLastNodeStanding){
 
-    FieldTable args;
-
-    // set queue mode
-	args.setInt("qpid.persist_last_node", 1);
+    client::QueueOptions args;
+	args.setPersistLastNode();
 	
 	Queue::shared_ptr queue(new Queue("my-queue", true));
     queue->configure(args);
@@ -292,8 +290,8 @@
 
 QPID_AUTO_TEST_CASE(testOptimisticConsume){
 
-    FieldTable args;
-	args.setInt("qpid.persist_last_node", 1);
+    client::QueueOptions args;
+	args.setPersistLastNode();
 
     // set queue mode
 	
@@ -305,7 +303,7 @@
 	msg1->forcePersistent();
 
 	//change mode
-	args.setInt("qpid.optimistic_consume", 1);
+	args.setOptimisticConsume();
     queue->configure(args);
 	
 	//enqueue 1 message
@@ -322,6 +320,94 @@
 
 }
 
+QPID_AUTO_TEST_CASE(testLVQOrdering){
+
+    client::QueueOptions args;
+    // set queue mode
+	args.setOrdering(client::LVQ);
+
+	Queue::shared_ptr queue(new Queue("my-queue", true ));
+	queue->configure(args);
+	
+    intrusive_ptr<Message> msg1 = message("e", "A");
+    intrusive_ptr<Message> msg2 = message("e", "B");
+    intrusive_ptr<Message> msg3 = message("e", "C");
+    intrusive_ptr<Message> msg4 = message("e", "D");
+    intrusive_ptr<Message> received;
+
+    //set deliever match for LVQ a,b,c,a
+
+    string key;
+	args.getLVQKey(key);
+    BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
+	
+
+	msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
+	msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b");
+	msg3->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c");
+	msg4->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
+	
+	//enqueue 4 message
+    queue->deliver(msg1);
+    queue->deliver(msg2);
+    queue->deliver(msg3);
+    queue->deliver(msg4);
+	
+    BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u);
+	
+    received = queue->get().payload;
+    BOOST_CHECK_EQUAL(msg4.get(), received.get());
+
+    received = queue->get().payload;
+    BOOST_CHECK_EQUAL(msg2.get(), received.get());
+	
+    received = queue->get().payload;
+    BOOST_CHECK_EQUAL(msg3.get(), received.get());
+
+    intrusive_ptr<Message> msg5 = message("e", "A");
+    intrusive_ptr<Message> msg6 = message("e", "B");
+    intrusive_ptr<Message> msg7 = message("e", "C");
+	msg5->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
+	msg6->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b");
+	msg7->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c");
+    queue->deliver(msg5);
+    queue->deliver(msg6);
+    queue->deliver(msg7);
+	
+    BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u);
+	
+    received = queue->get().payload;
+    BOOST_CHECK_EQUAL(msg5.get(), received.get());
+
+    received = queue->get().payload;
+    BOOST_CHECK_EQUAL(msg6.get(), received.get());
+	
+    received = queue->get().payload;
+    BOOST_CHECK_EQUAL(msg7.get(), received.get());
+	
+}
+
+QPID_AUTO_TEST_CASE(testLVQSaftyCheck){
+
+// This test is to check std::deque memory copy does not change out under us
+// if this test fails, then lvq would no longer be safe.
+
+    std::deque<string> deq;
+	
+	string a;
+	string b;
+	
+	deq.push_back(a);
+	deq.push_back(b);
+	string* tmp = &deq.back();
+	for (int a =0; a<=100000; a++){
+	    string z;
+		deq.push_back(z);
+	}
+	deq.pop_front();
+    BOOST_CHECK_EQUAL(&deq.front(),tmp);
+
+}
 
 QPID_AUTO_TEST_SUITE_END()