You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2006/12/05 14:14:39 UTC

svn commit: r482639 - in /incubator/qpid/trunk/qpid/cpp/lib/broker: BrokerMessage.cpp BrokerMessage.h BrokerQueue.cpp BrokerQueue.h Makefile.am QueuePolicy.cpp QueuePolicy.h SessionHandlerImpl.cpp

Author: gsim
Date: Tue Dec  5 05:14:38 2006
New Revision: 482639

URL: http://svn.apache.org/viewvc?view=rev&rev=482639
Log:
Added queue policy class for controlling when message content should be released from memory.


Added:
    incubator/qpid/trunk/qpid/cpp/lib/broker/QueuePolicy.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/lib/broker/QueuePolicy.h   (with props)
Modified:
    incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessage.cpp
    incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessage.h
    incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.cpp
    incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.h
    incubator/qpid/trunk/qpid/cpp/lib/broker/Makefile.am
    incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.cpp

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessage.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessage.cpp?view=diff&rev=482639&r1=482638&r2=482639
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessage.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessage.cpp Tue Dec  5 05:14:38 2006
@@ -32,6 +32,7 @@
 using namespace boost;
 using namespace qpid::broker;
 using namespace qpid::framing;
+using namespace qpid::sys;
 
 Message::Message(const ConnectionToken* const _publisher, 
                  const string& _exchange, const string& _routingKey, 
@@ -100,6 +101,7 @@
     AMQBody::shared_ptr headerBody = static_pointer_cast<AMQBody, AMQHeaderBody>(header);
     out->send(new AMQFrame(channel, headerBody));
 
+    Mutex::ScopedLock locker(contentLock);
     if (content.get()) content->send(out, channel, framesize);
 }
 
@@ -173,6 +175,7 @@
 
 void Message::encodeContent(Buffer& buffer)
 {
+    Mutex::ScopedLock locker(contentLock);
     if (content.get()) content->encode(buffer);
 }
 
@@ -183,6 +186,7 @@
 
 u_int32_t Message::encodedContentSize()
 {
+    Mutex::ScopedLock locker(contentLock);
     return content.get() ? content->size() : 0;
 }
 
@@ -200,6 +204,7 @@
 
 void Message::releaseContent(MessageStore* store)
 {
+    Mutex::ScopedLock locker(contentLock);
     if (!content.get() || content->size() > 0) {
         //set content to lazy loading mode (but only if there is stored content):
 
@@ -212,5 +217,6 @@
 
 void Message::setContent(std::auto_ptr<Content>& _content)
 {
+    Mutex::ScopedLock locker(contentLock);
     content = _content;
 }

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessage.h?view=diff&rev=482639&r1=482638&r2=482639
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessage.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessage.h Tue Dec  5 05:14:38 2006
@@ -23,13 +23,14 @@
 
 #include <memory>
 #include <boost/shared_ptr.hpp>
-#include <ConnectionToken.h>
-#include <Content.h>
-#include <TxBuffer.h>
 #include <AMQContentBody.h>
 #include <AMQHeaderBody.h>
 #include <BasicHeaderProperties.h>
+#include <ConnectionToken.h>
+#include <Content.h>
 #include <OutputHandler.h>
+#include <Mutex.h>
+#include <TxBuffer.h>
 
 namespace qpid {
     namespace broker {
@@ -52,6 +53,7 @@
             std::auto_ptr<Content> content;
             u_int64_t size;
             u_int64_t persistenceId;
+            qpid::sys::Mutex contentLock;
 
             void sendContent(qpid::framing::OutputHandler* out, 
                              int channel, u_int32_t framesize);

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.cpp?view=diff&rev=482639&r1=482638&r2=482639
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.cpp Tue Dec  5 05:14:38 2006
@@ -26,6 +26,7 @@
 
 using namespace qpid::broker;
 using namespace qpid::sys;
+using namespace qpid::framing;
 
 Queue::Queue(const string& _name, u_int32_t _autodelete, 
              MessageStore* const _store,
@@ -62,8 +63,7 @@
 }
 
 void Queue::recover(Message::shared_ptr& msg){
-    queueing = true;
-    messages.push(msg);
+    push(msg);
     if (store && msg->expectedContentSize() != msg->encodedContentSize()) {
         msg->releaseContent(store);
     }
@@ -72,8 +72,7 @@
 void Queue::process(Message::shared_ptr& msg){
     Mutex::ScopedLock locker(lock);
     if(queueing || !dispatch(msg)){
-        queueing = true;
-        messages.push(msg);
+        push(msg);
     }
 }
 
@@ -116,7 +115,7 @@
     while(proceed){
         Mutex::ScopedLock locker(lock);
         if(!messages.empty() && dispatch(messages.front())){
-            messages.pop();
+            pop();
         }else{
             dispatching = false;
             proceed = false;
@@ -149,7 +148,7 @@
     Message::shared_ptr msg;
     if(!messages.empty()){
         msg = messages.front();
-        messages.pop();
+        pop();
     }
     return msg;
 }
@@ -157,10 +156,19 @@
 u_int32_t Queue::purge(){
     Mutex::ScopedLock locker(lock);
     int count = messages.size();
-    while(!messages.empty()) messages.pop();
+    while(!messages.empty()) pop();
     return count;
 }
 
+void Queue::pop(){
+    messages.pop();
+}
+
+void Queue::push(Message::shared_ptr& msg){
+    queueing = true;
+    messages.push(msg);
+}
+
 u_int32_t Queue::getMessageCount() const{
     Mutex::ScopedLock locker(lock);
     return messages.size();
@@ -190,8 +198,30 @@
     }
 }
 
-void Queue::create()
+namespace 
 {
+    const std::string qpidMaxSize("qpid.max_size");
+    const std::string qpidMaxCount("qpid.max_count");
+}
+
+void Queue::create(const FieldTable& settings)
+{
+    //Note: currently field table only contain signed 32 bit ints, which
+    //      restricts the values that can be set on the queue policy.
+    u_int32_t maxCount(0);
+    try {
+        maxCount = settings.getInt(qpidMaxSize); 
+    } catch (FieldNotFoundException& ignore) {
+    }
+    u_int32_t maxSize(0);
+    try {
+        maxSize = settings.getInt(qpidMaxCount);
+    } catch (FieldNotFoundException& ignore) {
+    }
+    if (maxCount || maxSize) {
+        setPolicy(std::auto_ptr<QueuePolicy>(new QueuePolicy(maxCount, maxSize)));
+    }
+ 
     if (store) {
         store->create(*this);
     }
@@ -202,4 +232,9 @@
     if (store) {
         store->destroy(*this);
     }
+}
+
+void Queue::setPolicy(std::auto_ptr<QueuePolicy> _policy)
+{
+    policy = _policy;
 }

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.h?view=diff&rev=482639&r1=482638&r2=482639
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.h Tue Dec  5 05:14:38 2006
@@ -22,6 +22,7 @@
 #define _Queue_
 
 #include <vector>
+#include <memory>
 #include <queue>
 #include <boost/shared_ptr.hpp>
 #include <amqp_types.h>
@@ -29,7 +30,9 @@
 #include <ConnectionToken.h>
 #include <Consumer.h>
 #include <BrokerMessage.h>
+#include <FieldTable.h>
 #include <sys/Monitor.h>
+#include <QueuePolicy.h>
 
 namespace qpid {
     namespace broker {
@@ -41,6 +44,7 @@
         struct ExclusiveAccessException{};
 
         using std::string;
+
         /**
          * The brokers representation of an amqp queue. Messages are
          * delivered to a queue from where they can be dispatched to
@@ -62,9 +66,13 @@
             int64_t lastUsed;
             Consumer* exclusive;
             mutable u_int64_t persistenceId;
+            std::auto_ptr<QueuePolicy> policy;
 
+            void pop();
+            void push(Message::shared_ptr& msg);
             bool startDispatching();
             bool dispatch(Message::shared_ptr& msg);
+            void setPolicy(std::auto_ptr<QueuePolicy> policy);
 
         public:
             
@@ -77,7 +85,7 @@
                   const ConnectionToken* const owner = 0);
             ~Queue();
 
-            void create();
+            void create(const qpid::framing::FieldTable& settings);
             void destroy();
             /**
              * Informs the queue of a binding that should be cancelled on

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/Makefile.am?view=diff&rev=482639&r1=482638&r2=482639
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/Makefile.am Tue Dec  5 05:14:38 2006
@@ -59,6 +59,8 @@
   NullMessageStore.cpp				\
   NullMessageStore.h				\
   Prefetch.h					\
+  QueuePolicy.cpp				\
+  QueuePolicy.h					\
   QueueRegistry.cpp				\
   QueueRegistry.h				\
   RecoveryManager.cpp				\

Added: incubator/qpid/trunk/qpid/cpp/lib/broker/QueuePolicy.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/QueuePolicy.cpp?view=auto&rev=482639
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/QueuePolicy.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/QueuePolicy.cpp Tue Dec  5 05:14:38 2006
@@ -0,0 +1,49 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <QueuePolicy.h>
+
+using namespace qpid::broker;
+
+QueuePolicy::QueuePolicy(u_int32_t _maxCount, u_int64_t _maxSize) : maxCount(_maxCount), maxSize(_maxSize) {}
+
+void QueuePolicy::enqueued(Message::shared_ptr& msg, MessageStore* store)
+{
+    if (checkCount(msg) || checkSize(msg)) {
+        msg->releaseContent(store);
+    }
+}
+
+void QueuePolicy::dequeued(Message::shared_ptr& msg, MessageStore* /*store*/)
+{
+    if (maxCount) count--;
+    if (maxSize) size -= msg->contentSize();
+}
+
+bool QueuePolicy::checkCount(Message::shared_ptr& /*msg*/)
+{
+    return maxCount && ++count > maxCount;
+}
+
+bool QueuePolicy::checkSize(Message::shared_ptr& msg)
+{
+    return maxSize && (size += msg->contentSize()) > maxSize;
+}
+

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/QueuePolicy.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/QueuePolicy.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/lib/broker/QueuePolicy.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/QueuePolicy.h?view=auto&rev=482639
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/QueuePolicy.h (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/QueuePolicy.h Tue Dec  5 05:14:38 2006
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _QueuePolicy_
+#define _QueuePolicy_
+
+#include <BrokerMessage.h>
+
+namespace qpid {
+    namespace broker {
+        class QueuePolicy
+        {
+            const u_int32_t maxCount;
+            const u_int64_t maxSize;
+            u_int32_t count;
+            u_int64_t size;
+            
+            bool checkCount(Message::shared_ptr& msg);
+            bool checkSize(Message::shared_ptr& msg);
+        public:
+            QueuePolicy(u_int32_t maxCount, u_int64_t maxSize);
+            void enqueued(Message::shared_ptr& msg, MessageStore* store);
+            void dequeued(Message::shared_ptr& msg, MessageStore* store);
+        };
+    }
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/QueuePolicy.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/QueuePolicy.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.cpp?view=diff&rev=482639&r1=482638&r2=482639
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/SessionHandlerImpl.cpp Tue Dec  5 05:14:38 2006
@@ -256,7 +256,7 @@
 
 void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, const string& name, 
                                                    bool passive, bool durable, bool exclusive, 
-                                                   bool autoDelete, bool nowait, const qpid::framing::FieldTable& /*arguments*/){
+                                                   bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){
     Queue::shared_ptr queue;
     if (passive && !name.empty()) {
 	queue = parent->getQueue(name, channel);
@@ -268,8 +268,8 @@
 	if (queue_created.second) { // This is a new queue
 	    parent->getChannel(channel)->setDefaultQueue(queue);
 
-            //create persistent record if required
-            queue_created.first->create();
+            //apply settings & create persistent record if required
+            queue_created.first->create(arguments);
 
 	    //add default binding:
 	    parent->exchanges->getDefault()->bind(queue, name, 0);