You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2008/11/27 13:21:05 UTC

svn commit: r721166 - in /incubator/qpid/trunk/qpid/cpp/src: qpid/broker/ tests/

Author: gsim
Date: Thu Nov 27 04:21:04 2008
New Revision: 721166

URL: http://svn.apache.org/viewvc?rev=721166&view=rev
Log:
* QPID-1488: test that policy pointer is set
* don't flow to disk for null store implementation
* add checks for undeflow in queue policy


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/MessageBuilderTest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp?rev=721166&r1=721165&r2=721166&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp Thu Nov 27 04:21:04 2008
@@ -58,7 +58,7 @@
                     msg->flush(); // Can re-enter IncompleteMessageList::enqueueComplete
                 }
                 while (!msg->isEnqueueComplete())
-                       lock.wait();
+                    lock.wait();
             } else {
                 //leave the message as incomplete for now
                 return;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp?rev=721166&r1=721165&r2=721166&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp Thu Nov 27 04:21:04 2008
@@ -22,6 +22,7 @@
 
 #include "Message.h"
 #include "MessageStore.h"
+#include "NullMessageStore.h"
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/framing/reply_exceptions.h"
 
@@ -72,7 +73,11 @@
     } else {
         message->getFrames().append(frame);
         //have we reached the staging limit? if so stage message and release content
-        if (state == CONTENT && stagingThreshold && message->getFrames().getContentSize() >= stagingThreshold) {
+        if (state == CONTENT 
+            && stagingThreshold 
+            && message->getFrames().getContentSize() >= stagingThreshold
+            && !NullMessageStore::isNullStore(store)) 
+        {
             message->releaseContent(store); 
             staging = true;
         }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp?rev=721166&r1=721165&r2=721166&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp Thu Nov 27 04:21:04 2008
@@ -140,5 +140,15 @@
     out.insert(prepared.begin(), prepared.end());
 }
 
+bool NullMessageStore::isNull() const
+{
+    return true;
+}
+
+bool NullMessageStore::isNullStore(const MessageStore* store)
+{
+    const NullMessageStore* test = dynamic_cast<const NullMessageStore*>(store);
+    return test && test->isNull();
+}
 
 }}  // namespace qpid::broker

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h?rev=721166&r1=721165&r2=721166&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h Thu Nov 27 04:21:04 2008
@@ -76,6 +76,9 @@
     virtual uint32_t outstandingQueueAIO(const PersistableQueue& queue);
     virtual void flush(const qpid::broker::PersistableQueue& queue);
     ~NullMessageStore(){}
+
+    virtual bool isNull() const;
+    static bool isNullStore(const MessageStore*);
 };
 
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=721166&r1=721165&r2=721166&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Thu Nov 27 04:21:04 2008
@@ -24,6 +24,7 @@
 #include "Exchange.h"
 #include "DeliverableMessage.h"
 #include "MessageStore.h"
+#include "NullMessageStore.h"
 #include "QueueRegistry.h"
 
 #include "qpid/StringUtils.h"
@@ -741,12 +742,15 @@
 {
     buffer.putShortString(name);
     buffer.put(settings);
-    buffer.put(*policy);
+    if (policy.get()) { 
+        buffer.put(*policy);
+    }
 }
 
 uint32_t Queue::encodedSize() const
 {
-    return name.size() + 1/*short string size octet*/ + settings.encodedSize() + (*policy).encodedSize();
+    return name.size() + 1/*short string size octet*/ + settings.encodedSize() 
+        + (policy.get() ? (*policy).encodedSize() : 0);
 }
 
 Queue::shared_ptr Queue::decode(QueueRegistry& queues, Buffer& buffer)
@@ -756,7 +760,9 @@
     std::pair<Queue::shared_ptr, bool> result = queues.declare(name, true);
     buffer.get(result.first->settings);
     result.first->configure(result.first->settings);
-    buffer.get ( *(result.first->policy) );
+    if (result.first->policy.get()) {
+        buffer.get ( *(result.first->policy) );
+    }
     return result.first;
 }
 
@@ -828,7 +834,7 @@
 
 bool Queue::releaseMessageContent(const QueuedMessage& m)
 {
-    if (store) {
+    if (store && !NullMessageStore::isNullStore(store)) {
         QPID_LOG(debug, "Message " << m.position << " on " << name << " released from memory");
         m.payload->releaseContent(store);
         return true;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp?rev=721166&r1=721165&r2=721166&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp Thu Nov 27 04:21:04 2008
@@ -20,6 +20,7 @@
  */
 #include "QueuePolicy.h"
 #include "Queue.h"
+#include "qpid/Exception.h"
 #include "qpid/framing/FieldValue.h"
 #include "qpid/framing/reply_exceptions.h"
 #include "qpid/log/Statement.h"
@@ -38,8 +39,23 @@
 
 void QueuePolicy::dequeued(uint64_t _size)
 {
-    if (maxCount) --count;
-    if (maxSize) size -= _size;
+    //Note: underflow detection is not reliable in the face of
+    //concurrent updates (at present locking in Queue.cpp prevents
+    //these anyway); updates are atomic and are safe regardless.
+    if (maxCount) {
+        if (count.get() > 0) {
+            --count;
+        } else {
+            throw Exception(QPID_MSG("Attempted count underflow on dequeue(" << _size << "): " << *this));
+        }
+    }
+    if (maxSize) {
+        if (_size > size.get()) {
+            throw Exception(QPID_MSG("Attempted size underflow on dequeue(" << _size << "): " << *this));
+        } else {
+            size -= _size;
+        }
+    }
 }
 
 bool QueuePolicy::checkLimit(const QueuedMessage& m)

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=721166&r1=721165&r2=721166&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Thu Nov 27 04:21:04 2008
@@ -387,8 +387,9 @@
     cacheExchange->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders());
 
     if (!strategy.delivered) {
-        //TODO:if reject-unroutable, then reject
-        //else route to alternate exchange
+        //TODO:if discard-unroutable, just drop it
+        //TODO:else if accept-mode is explicit, reject it 
+        //else route it to alternate exchange
         if (cacheExchange->getAlternate()) {
             cacheExchange->getAlternate()->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders());
         }

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/MessageBuilderTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/MessageBuilderTest.cpp?rev=721166&r1=721165&r2=721166&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/MessageBuilderTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/MessageBuilderTest.cpp Thu Nov 27 04:21:04 2008
@@ -82,6 +82,13 @@
     {
         return ops.empty();
     }
+
+    //don't treat this store as a null impl
+    bool isNull() const
+    {
+        return false;
+    }
+
 };
     
 QPID_AUTO_TEST_SUITE(MessageBuilderTestSuite)