You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2012/08/10 14:04:32 UTC

svn commit: r1371676 [1/8] - in /qpid/trunk/qpid: cpp/src/ cpp/src/qpid/broker/ cpp/src/qpid/broker/amqp_0_10/ cpp/src/qpid/ha/ cpp/src/qpid/management/ cpp/src/qpid/replication/ cpp/src/qpid/store/ cpp/src/qpid/xml/ cpp/src/tests/ tests/src/py/qpid_te...

Author: gsim
Date: Fri Aug 10 12:04:27 2012
New Revision: 1371676

URL: http://svn.apache.org/viewvc?rev=1371676&view=rev
Log:
QPID-4178: broker refactoring

Added:
    qpid/trunk/qpid/cpp/src/qpid/broker/IndexedDeque.h
    qpid/trunk/qpid/cpp/src/qpid/broker/LossyQueue.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/LossyQueue.h
      - copied, changed from r1371647, qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Lvq.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Lvq.h
      - copied, changed from r1371647, qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.h
    qpid/trunk/qpid/cpp/src/qpid/broker/MapHandler.h
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueCursor.cpp
      - copied, changed from r1371647, qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueCursor.h
      - copied, changed from r1371647, qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.h
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueDepth.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueDepth.h
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueFactory.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueFactory.h
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.h
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h
    qpid/trunk/qpid/cpp/src/tests/QueueDepth.cpp
Removed:
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.h
    qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.h
    qpid/trunk/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp
    qpid/trunk/qpid/cpp/src/qpid/replication/ReplicatingEventListener.h
    qpid/trunk/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp
    qpid/trunk/qpid/cpp/src/qpid/replication/ReplicationExchange.h
    qpid/trunk/qpid/cpp/src/qpid/replication/constants.h
    qpid/trunk/qpid/cpp/src/replication.mk
    qpid/trunk/qpid/cpp/src/tests/MessageBuilderTest.cpp
    qpid/trunk/qpid/cpp/src/tests/QueueEvents.cpp
    qpid/trunk/qpid/cpp/src/tests/ReplicationTest.cpp
    qpid/trunk/qpid/cpp/src/tests/TxPublishTest.cpp
Modified:
    qpid/trunk/qpid/cpp/src/CMakeLists.txt
    qpid/trunk/qpid/cpp/src/Makefile.am
    qpid/trunk/qpid/cpp/src/qpid/broker/AsyncCompletion.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Deliverable.h
    qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.h
    qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryAdapter.h
    qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
    qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
    qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Fairshare.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Fairshare.h
    qpid/trunk/qpid/cpp/src/qpid/broker/FifoDistributor.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/FifoDistributor.h
    qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h
    qpid/trunk/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/LegacyLVQ.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
    qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.h
    qpid/trunk/qpid/cpp/src/qpid/broker/MessageDeque.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/MessageDeque.h
    qpid/trunk/qpid/cpp/src/qpid/broker/MessageDistributor.h
    qpid/trunk/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/MessageGroupManager.h
    qpid/trunk/qpid/cpp/src/qpid/broker/MessageMap.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/MessageMap.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Messages.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Persistable.h
    qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h
    qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueObserver.h
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h
    qpid/trunk/qpid/cpp/src/qpid/broker/QueuedMessage.h
    qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredDequeue.h
    qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredEnqueue.h
    qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
    qpid/trunk/qpid/cpp/src/qpid/broker/ThresholdAlerts.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/ThresholdAlerts.h
    qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.h
    qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.h
    qpid/trunk/qpid/cpp/src/qpid/broker/TxOp.h
    qpid/trunk/qpid/cpp/src/qpid/broker/TxOpVisitor.h
    qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h
    qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.h
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
    qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.cpp
    qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
    qpid/trunk/qpid/cpp/src/qpid/management/ManagementDirectExchange.cpp
    qpid/trunk/qpid/cpp/src/qpid/management/ManagementTopicExchange.cpp
    qpid/trunk/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp
    qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.cpp
    qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.h
    qpid/trunk/qpid/cpp/src/tests/CMakeLists.txt
    qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
    qpid/trunk/qpid/cpp/src/tests/DeliveryRecordTest.cpp
    qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp
    qpid/trunk/qpid/cpp/src/tests/Makefile.am
    qpid/trunk/qpid/cpp/src/tests/MessageTest.cpp
    qpid/trunk/qpid/cpp/src/tests/MessageUtils.h
    qpid/trunk/qpid/cpp/src/tests/QueueFlowLimitTest.cpp
    qpid/trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp
    qpid/trunk/qpid/cpp/src/tests/QueueRegistryTest.cpp
    qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
    qpid/trunk/qpid/cpp/src/tests/TxMocks.h
    qpid/trunk/qpid/cpp/src/tests/ha_tests.py
    qpid/trunk/qpid/cpp/src/tests/test_store.cpp
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/management.py
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/new_api.py
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/priority.py
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/threshold.py

Modified: qpid/trunk/qpid/cpp/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/CMakeLists.txt?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/CMakeLists.txt (original)
+++ qpid/trunk/qpid/cpp/src/CMakeLists.txt Fri Aug 10 12:04:27 2012
@@ -1110,7 +1110,6 @@ set (qpidbroker_SOURCES
      qpid/broker/Exchange.cpp
      qpid/broker/ExpiryPolicy.cpp
      qpid/broker/Fairshare.cpp
-     qpid/broker/LegacyLVQ.cpp
      qpid/broker/MessageDeque.cpp
      qpid/broker/MessageMap.cpp
      qpid/broker/PriorityQueue.cpp
@@ -1137,6 +1136,8 @@ set (qpidbroker_SOURCES
      qpid/broker/HeadersExchange.cpp
      qpid/broker/Link.cpp
      qpid/broker/LinkRegistry.cpp
+     qpid/broker/LossyQueue.cpp
+     qpid/broker/Lvq.cpp
      qpid/broker/Message.cpp
      qpid/broker/MessageAdapter.cpp
      qpid/broker/MessageBuilder.cpp
@@ -1145,9 +1146,11 @@ set (qpidbroker_SOURCES
      qpid/broker/NullMessageStore.cpp
      qpid/broker/QueueBindings.cpp
      qpid/broker/QueuedMessage.cpp
-     qpid/broker/QueueEvents.cpp
-     qpid/broker/QueuePolicy.cpp
+     qpid/broker/QueueCursor.cpp
+     qpid/broker/QueueDepth.cpp
+     qpid/broker/QueueFactory.cpp
      qpid/broker/QueueRegistry.cpp
+     qpid/broker/QueueSettings.cpp
      qpid/broker/QueueFlowLimit.cpp
      qpid/broker/RecoveryManagerImpl.cpp
      qpid/broker/RecoveredEnqueue.cpp
@@ -1170,8 +1173,8 @@ set (qpidbroker_SOURCES
      qpid/broker/TopicExchange.cpp
      qpid/broker/TxAccept.cpp
      qpid/broker/TxBuffer.cpp
-     qpid/broker/TxPublish.cpp
      qpid/broker/Vhost.cpp
+     qpid/broker/amqp_0_10/MessageTransfer.cpp
      qpid/management/ManagementAgent.cpp
      qpid/management/ManagementDirectExchange.cpp
      qpid/management/ManagementTopicExchange.cpp
@@ -1419,45 +1422,6 @@ install (FILES ${qmfconsole_HEADERS}
          COMPONENT ${QPID_COMPONENT_QMF})
 install_pdb (qmfconsole ${QPID_COMPONENT_QMF})
 
-# A queue event listener plugin that creates messages on a replication
-# queue corresponding to enqueue and dequeue events:
-set (replicating_listener_SOURCES
-     qpid/replication/constants.h
-     qpid/replication/ReplicatingEventListener.cpp
-     qpid/replication/ReplicatingEventListener.h
-    )
-add_msvc_version (replicating_listener library dll)
-add_library (replicating_listener MODULE ${replicating_listener_SOURCES})
-target_link_libraries (replicating_listener qpidbroker ${Boost_PROGRAM_OPTIONS_LIBRARY})
-set_target_properties (replicating_listener PROPERTIES PREFIX "")
-if (CMAKE_COMPILER_IS_GNUCXX)
-  set_target_properties(replicating_listener PROPERTIES
-                        LINK_FLAGS "${GCC_CATCH_UNDEFINED}")
-endif (CMAKE_COMPILER_IS_GNUCXX)
-install (TARGETS replicating_listener
-         DESTINATION ${QPIDD_MODULE_DIR}
-         COMPONENT ${QPID_COMPONENT_BROKER})
-
-# A custom exchange plugin that allows an exchange to be created that
-# can process the messages from a replication queue (populated on the
-# source system by the replicating listener plugin above) and take the
-# corresponding action on the local queues
-set (replication_exchange_SOURCES
-     qpid/replication/constants.h
-     qpid/replication/ReplicationExchange.cpp
-     qpid/replication/ReplicationExchange.h
-    )
-add_msvc_version (replication_exchange library dll)
-add_library (replication_exchange MODULE ${replication_exchange_SOURCES})
-target_link_libraries (replication_exchange qpidbroker)
-set_target_properties (replication_exchange PROPERTIES PREFIX "")
-if (CMAKE_COMPILER_IS_GNUCXX)
-  set_target_properties(replication_exchange PROPERTIES
-                        LINK_FLAGS "${GCC_CATCH_UNDEFINED}")
-endif (CMAKE_COMPILER_IS_GNUCXX)
-install (TARGETS replication_exchange
-         DESTINATION ${QPIDD_MODULE_DIR}
-         COMPONENT ${QPID_COMPONENT_BROKER})
 
 # This is only really needed until all the trunk builds (Linux, UNIX, Windows)
 # are all on cmake only. This is because cmake builds always have a config.h

Modified: qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/Makefile.am Fri Aug 10 12:04:27 2012
@@ -223,7 +223,6 @@ include qmfc.mk
 if HAVE_XML
 include xml.mk
 endif
-include replication.mk
 
 if RDMA
 
@@ -334,6 +333,7 @@ libqpidcommon_la_LIBADD = \
   -lboost_program_options \
   -lboost_filesystem \
   -luuid \
+  -lpthread \
   $(LIB_DLOPEN) \
   $(LIB_CLOCK_GETTIME)
 
@@ -553,7 +553,6 @@ libqpidbroker_la_SOURCES = \
   qpid/broker/Deliverable.h \
   qpid/broker/DeliverableMessage.cpp \
   qpid/broker/DeliverableMessage.h \
-  qpid/broker/DeliveryAdapter.h \
   qpid/broker/DeliveryId.h \
   qpid/broker/DeliveryRecord.cpp \
   qpid/broker/DeliveryRecord.h \
@@ -584,12 +583,14 @@ libqpidbroker_la_SOURCES = \
   qpid/broker/HeadersExchange.cpp \
   qpid/broker/HeadersExchange.h \
   qpid/broker/AsyncCompletion.h \
-  qpid/broker/LegacyLVQ.h \
-  qpid/broker/LegacyLVQ.cpp \
+  qpid/broker/IndexedDeque.h \
   qpid/broker/Link.cpp \
   qpid/broker/Link.h \
   qpid/broker/LinkRegistry.cpp \
   qpid/broker/LinkRegistry.h \
+  qpid/broker/Lvq.h \
+  qpid/broker/Lvq.cpp \
+  qpid/broker/MapHandler.h \
   qpid/broker/Message.cpp \
   qpid/broker/Message.h \
   qpid/broker/MessageAdapter.cpp \
@@ -624,19 +625,25 @@ libqpidbroker_la_SOURCES = \
   qpid/broker/QueueBindings.h \
   qpid/broker/QueueCleaner.cpp \
   qpid/broker/QueueCleaner.h \
-  qpid/broker/QueueEvents.cpp \
-  qpid/broker/QueueEvents.h \
+  qpid/broker/QueueCursor.h \
+  qpid/broker/QueueCursor.cpp \
+  qpid/broker/QueueDepth.h \
+  qpid/broker/QueueDepth.cpp \
+  qpid/broker/QueueFactory.h \
+  qpid/broker/QueueFactory.cpp \
+  qpid/broker/QueueSettings.h \
+  qpid/broker/QueueSettings.cpp \
   qpid/broker/QueueListeners.cpp \
   qpid/broker/QueueListeners.h \
   qpid/broker/QueueObserver.h \
-  qpid/broker/QueuePolicy.cpp \
-  qpid/broker/QueuePolicy.h \
   qpid/broker/QueueRegistry.cpp \
   qpid/broker/QueueRegistry.h \
   qpid/broker/QueuedMessage.cpp \
   qpid/broker/QueuedMessage.h \
   qpid/broker/QueueFlowLimit.h \
   qpid/broker/QueueFlowLimit.cpp \
+  qpid/broker/LossyQueue.h \
+  qpid/broker/LossyQueue.cpp \
   qpid/broker/RecoverableConfig.h \
   qpid/broker/RecoverableExchange.h \
   qpid/broker/RecoverableMessage.h \
@@ -687,9 +694,6 @@ libqpidbroker_la_SOURCES = \
   qpid/broker/TxBuffer.cpp \
   qpid/broker/TxBuffer.h \
   qpid/broker/TxOp.h \
-  qpid/broker/TxOpVisitor.h \
-  qpid/broker/TxPublish.cpp \
-  qpid/broker/TxPublish.h \
   qpid/broker/Vhost.cpp \
   qpid/broker/Vhost.h \
   qpid/broker/MessageDistributor.h \
@@ -697,6 +701,8 @@ libqpidbroker_la_SOURCES = \
   qpid/broker/FifoDistributor.cpp \
   qpid/broker/MessageGroupManager.cpp \
   qpid/broker/MessageGroupManager.h \
+  qpid/broker/amqp_0_10/MessageTransfer.h \
+  qpid/broker/amqp_0_10/MessageTransfer.cpp \
   qpid/management/ManagementAgent.cpp \
   qpid/management/ManagementAgent.h \
   qpid/management/ManagementDirectExchange.cpp \

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/AsyncCompletion.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/AsyncCompletion.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/AsyncCompletion.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/AsyncCompletion.h Fri Aug 10 12:04:27 2012
@@ -22,6 +22,7 @@
  *
  */
 
+#include "qpid/RefCounted.h"
 #include <boost/intrusive_ptr.hpp>
 
 #include "qpid/broker/BrokerImportExport.h"
@@ -77,7 +78,7 @@ namespace broker {
  * assuming no need for synchronization with Completer threads.
  */
 
-class AsyncCompletion
+class AsyncCompletion : public virtual RefCounted
 {
  public:
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Fri Aug 10 12:04:27 2012
@@ -33,6 +33,7 @@
 #include "qpid/broker/Link.h"
 #include "qpid/broker/ExpiryPolicy.h"
 #include "qpid/broker/QueueFlowLimit.h"
+#include "qpid/broker/QueueSettings.h"
 #include "qpid/broker/MessageGroupManager.h"
 
 #include "qmf/org/apache/qpid/broker/Package.h"
@@ -120,7 +121,6 @@ Broker::Options::Options(const std::stri
     queueLimit(100*1048576/*100M default limit*/),
     tcpNoDelay(false),
     requireEncrypted(false),
-    asyncQueueEvents(false),     // Must be false in a cluster.
     qmf2Support(true),
     qmf1Support(true),
     queueFlowStopRatio(80),
@@ -164,7 +164,6 @@ Broker::Options::Options(const std::stri
         ("require-encryption", optValue(requireEncrypted), "Only accept connections that are encrypted")
         ("known-hosts-url", optValue(knownHosts, "URL or 'none'"), "URL to send as 'known-hosts' to clients ('none' implies empty list)")
         ("sasl-config", optValue(saslConfigPath, "DIR"), "gets sasl config info from nonstandard location")
-        ("async-queue-events", optValue(asyncQueueEvents, "yes|no"), "Set Queue Events async, used for services like replication")
         ("default-flow-stop-threshold", optValue(queueFlowStopRatio, "PERCENT"), "Percent of queue's maximum capacity at which flow control is activated.")
         ("default-flow-resume-threshold", optValue(queueFlowResumeRatio, "PERCENT"), "Percent of queue's maximum capacity at which flow control is de-activated.")
         ("default-event-threshold-ratio", optValue(queueThresholdEventRatio, "%age of limit"), "The ratio of any specified queue limit at which an event will be raised")
@@ -206,7 +205,6 @@ Broker::Broker(const Broker::Options& co
         *this),
     mgmtObject(0),
     queueCleaner(queues, &timer),
-    queueEvents(poller,!conf.asyncQueueEvents),
     recovery(true),
     inCluster(false),
     clusterUpdatee(false),
@@ -265,8 +263,6 @@ Broker::Broker(const Broker::Options& co
             federationTag = conf.fedTag;
     }
 
-    QueuePolicy::setDefaultMaxSize(conf.queueLimit);
-
     // Early-Initialize plugins
     Plugin::earlyInitAll(*this);
 
@@ -425,7 +421,6 @@ void Broker::shutdown() {
 
 Broker::~Broker() {
     shutdown();
-    queueEvents.shutdown();
     finalize();                 // Finalize any plugins.
     if (config.auth)
         SaslAuthenticator::fini();
@@ -689,11 +684,15 @@ void Broker::createObject(const std::str
             //treat everything else as extension properties
             else extensions[i->first] = i->second;
         }
-        framing::FieldTable arguments;
-        amqp_0_10::translate(extensions, arguments);
+        QueueSettings settings(durable, autodelete);
+        Variant::Map unused;
+        settings.populate(extensions, unused);
+        qpid::amqp_0_10::translate(unused, settings.storeSettings);
+        //TODO: unused doesn't take store settings into account... so can't yet implement strict
+        QPID_LOG(debug, "Broker did not use the following settings (store module may): " << unused);
 
         std::pair<boost::shared_ptr<Queue>, bool> result =
-            createQueue(name, durable, autodelete, 0, alternateExchange, arguments, userId, connectionId);
+            createQueue(name, settings, 0, alternateExchange, userId, connectionId);
         if (!result.second) {
             throw ObjectAlreadyExists(name);
         }
@@ -1041,8 +1040,7 @@ Broker::getKnownBrokersImpl()
     return knownBrokers;
 }
 
-bool Broker::deferDeliveryImpl(const std::string& ,
-                               const boost::intrusive_ptr<Message>& )
+bool Broker::deferDeliveryImpl(const std::string&, const Message&)
 { return false; }
 
 void Broker::setClusterTimer(std::auto_ptr<sys::Timer> t) {
@@ -1056,23 +1054,21 @@ const std::string Broker::TCP_TRANSPORT(
 
 std::pair<boost::shared_ptr<Queue>, bool> Broker::createQueue(
     const std::string& name,
-    bool durable,
-    bool autodelete,
+    const QueueSettings& settings,
     const OwnershipToken* owner,
     const std::string& alternateExchange,
-    const qpid::framing::FieldTable& arguments,
     const std::string& userId,
     const std::string& connectionId)
 {
     if (acl) {
         std::map<acl::Property, std::string> params;
         params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
-        params.insert(make_pair(acl::PROP_DURABLE, durable ? _TRUE : _FALSE));
+        params.insert(make_pair(acl::PROP_DURABLE, settings.durable ? _TRUE : _FALSE));
         params.insert(make_pair(acl::PROP_EXCLUSIVE, owner ? _TRUE : _FALSE));
-        params.insert(make_pair(acl::PROP_AUTODELETE, autodelete ? _TRUE : _FALSE));
-        params.insert(make_pair(acl::PROP_POLICYTYPE, arguments.getAsString("qpid.policy_type")));
-        params.insert(make_pair(acl::PROP_MAXQUEUECOUNT, boost::lexical_cast<string>(arguments.getAsInt("qpid.max_count"))));
-        params.insert(make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast<string>(arguments.getAsInt64("qpid.max_size"))));
+        params.insert(make_pair(acl::PROP_AUTODELETE, settings.autodelete ? _TRUE : _FALSE));
+        params.insert(make_pair(acl::PROP_POLICYTYPE, settings.dropMessagesAtLimit ? "ring" : "reject"));
+        params.insert(make_pair(acl::PROP_MAXQUEUECOUNT, boost::lexical_cast<string>(settings.maxDepth.getCount())));
+        params.insert(make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast<string>(settings.maxDepth.getSize())));
 
         if (!acl->authorise(userId,acl::ACT_CREATE,acl::OBJ_QUEUE,name,&params) )
             throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue create request from " << userId));
@@ -1084,7 +1080,7 @@ std::pair<boost::shared_ptr<Queue>, bool
         if (!alternate) throw framing::NotFoundException(QPID_MSG("Alternate exchange does not exist: " << alternateExchange));
     }
 
-    std::pair<Queue::shared_ptr, bool> result = queues.declare(name, durable, autodelete, owner, alternate, arguments);
+    std::pair<Queue::shared_ptr, bool> result = queues.declare(name, settings, alternate);
     if (result.second) {
         //add default binding:
         result.first->bind(exchanges.getDefault(), name);
@@ -1095,16 +1091,16 @@ std::pair<boost::shared_ptr<Queue>, bool
             //event instead?
             managementAgent->raiseEvent(
                 _qmf::EventQueueDeclare(connectionId, userId, name,
-                                        durable, owner, autodelete, alternateExchange,
-                                        ManagementAgent::toMap(arguments),
+                                        settings.durable, owner, settings.autodelete, alternateExchange,
+                                        settings.asMap(),
                                         "created"));
         }
         QPID_LOG_CAT(debug, model, "Create queue. name:" << name
             << " user:" << userId
             << " rhost:" << connectionId
-            << " durable:" << (durable ? "T" : "F")
+            << " durable:" << (settings.durable ? "T" : "F")
             << " owner:" << owner
-            << " autodelete:" << (autodelete ? "T" : "F")
+            << " autodelete:" << (settings.autodelete ? "T" : "F")
             << " alternateExchange:" << alternateExchange );
     }
     return result;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Fri Aug 10 12:04:27 2012
@@ -33,7 +33,6 @@
 #include "qpid/broker/LinkRegistry.h"
 #include "qpid/broker/SessionManager.h"
 #include "qpid/broker/QueueCleaner.h"
-#include "qpid/broker/QueueEvents.h"
 #include "qpid/broker/Vhost.h"
 #include "qpid/broker/System.h"
 #include "qpid/broker/ExpiryPolicy.h"
@@ -75,7 +74,7 @@ namespace broker {
 class ConnectionState;
 class ExpiryPolicy;
 class Message;
-
+struct QueueSettings;
 static const  uint16_t DEFAULT_PORT=5672;
 
 struct NoSuchTransportException : qpid::Exception
@@ -117,7 +116,6 @@ class Broker : public sys::Runnable, pub
         bool requireEncrypted;
         std::string knownHosts;
         std::string saslConfigPath;
-        bool asyncQueueEvents;
         bool qmf2Support;
         bool qmf1Support;
         uint queueFlowStopRatio;    // producer flow control: on
@@ -177,11 +175,10 @@ class Broker : public sys::Runnable, pub
     Vhost::shared_ptr            vhostObject;
     System::shared_ptr           systemObject;
     QueueCleaner queueCleaner;
-    QueueEvents queueEvents;
     std::vector<Url> knownBrokers;
     std::vector<Url> getKnownBrokersImpl();
     bool deferDeliveryImpl(const std::string& queue,
-                           const boost::intrusive_ptr<Message>& msg);
+                           const Message& msg);
     std::string federationTag;
     bool recovery;
     bool inCluster, clusterUpdatee;
@@ -225,7 +222,6 @@ class Broker : public sys::Runnable, pub
     DtxManager& getDtxManager() { return dtxManager; }
     DataDir& getDataDir() { return dataDir; }
     Options& getOptions() { return config; }
-    QueueEvents& getQueueEvents() { return queueEvents; }
 
     void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) { expiryPolicy = e; }
     boost::intrusive_ptr<ExpiryPolicy> getExpiryPolicy() { return expiryPolicy; }
@@ -307,7 +303,8 @@ class Broker : public sys::Runnable, pub
      * context.
      *@return true if delivery of a message should be deferred.
      */
-    boost::function<bool (const std::string& queue, const boost::intrusive_ptr<Message>& msg)> deferDelivery;
+    boost::function<bool (const std::string& queue,
+                          const Message& msg)> deferDelivery;
 
     bool isAuthenticating ( ) { return config.auth; }
     bool isTimestamping() { return config.timestampRcvMsgs; }
@@ -316,11 +313,9 @@ class Broker : public sys::Runnable, pub
 
     QPID_BROKER_EXTERN std::pair<boost::shared_ptr<Queue>, bool> createQueue(
         const std::string& name,
-        bool durable,
-        bool autodelete,
+        const QueueSettings& settings,
         const OwnershipToken* owner,
         const std::string& alternateExchange,
-        const qpid::framing::FieldTable& arguments,
         const std::string& userId,
         const std::string& connectionId);
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h Fri Aug 10 12:04:27 2012
@@ -21,21 +21,23 @@
 #ifndef _Consumer_
 #define _Consumer_
 
-#include "qpid/broker/Message.h"
-#include "qpid/broker/QueuedMessage.h"
+#include "qpid/broker/QueueCursor.h"
 #include "qpid/broker/OwnershipToken.h"
+#include <boost/shared_ptr.hpp>
+#include <string>
 
 namespace qpid {
 namespace broker {
 
+class DeliveryRecord;
+class Message;
 class Queue;
 class QueueListeners;
 
 /**
  * Base class for consumers which represent a subscription to a queue.
  */
-class Consumer
-{
+class Consumer : public QueueCursor {
     const bool acquires;
     // inListeners allows QueueListeners to efficiently track if this
     // instance is registered for notifications without having to
@@ -47,22 +49,17 @@ class Consumer
  public:
     typedef boost::shared_ptr<Consumer> shared_ptr;
 
-    Consumer(const std::string& _name, bool preAcquires = true)
-      : acquires(preAcquires), inListeners(false), name(_name), position(0) {}
+    Consumer(const std::string& _name, SubscriptionType type)
+        : QueueCursor(type), acquires(type == CONSUMER), inListeners(false), name(_name) {}
     virtual ~Consumer(){}
 
     bool preAcquires() const { return acquires; }
     const std::string& getName() const { return name; }
 
-    /**@return the position of the last message seen by this consumer */
-    virtual framing::SequenceNumber getPosition() const  { return position; }
-
-    virtual void setPosition(framing::SequenceNumber pos) { position = pos; }
-
-    virtual bool deliver(QueuedMessage& msg) = 0;
+    virtual bool deliver(const QueueCursor& cursor, const Message& msg) = 0;
     virtual void notify() = 0;
-    virtual bool filter(boost::intrusive_ptr<Message>) { return true; }
-    virtual bool accept(boost::intrusive_ptr<Message>) { return true; }
+    virtual bool filter(const Message&) { return true; }
+    virtual bool accept(const Message&) { return true; }
     virtual OwnershipToken* getSession() = 0;
     virtual void cancel() = 0;
 
@@ -75,7 +72,7 @@ class Consumer
      * Not to be confused with accept() above, which is asking if
      * this consumer will consume/browse the message.
      */
-    virtual void acknowledged(const QueuedMessage&) = 0;
+    virtual void acknowledged(const DeliveryRecord&) = 0;
 
     /** Called if queue has been deleted, if true suppress the error message.
      * Used by HA ReplicatingSubscriptions where such errors are normal.
@@ -83,7 +80,7 @@ class Consumer
     virtual bool hideDeletedError() { return false; }
 
   protected:
-    framing::SequenceNumber position;
+    //framing::SequenceNumber position;
 
   private:
     friend class QueueListeners;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Deliverable.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Deliverable.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Deliverable.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Deliverable.h Fri Aug 10 12:04:27 2012
@@ -21,17 +21,22 @@
 #ifndef _Deliverable_
 #define _Deliverable_
 
-#include "qpid/broker/Message.h"
+#include "qpid/broker/AsyncCompletion.h"
+#include "qpid/sys/IntegerTypes.h"
+#include <boost/shared_ptr.hpp>
 
 namespace qpid {
     namespace broker {
-        class Deliverable{
+        class Message;
+        class Queue;
+
+    class Deliverable : public AsyncCompletion {
         public:
             bool delivered;
             Deliverable() : delivered(false) {}
 
 	    virtual Message& getMessage() = 0;
-	    
+
             virtual void deliverTo(const boost::shared_ptr<Queue>& queue) = 0;
             virtual uint64_t contentSize() { return 0; }
             virtual ~Deliverable(){}

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.cpp Fri Aug 10 12:04:27 2012
@@ -24,22 +24,20 @@
 
 using namespace qpid::broker;
 
-DeliverableMessage::DeliverableMessage(const boost::intrusive_ptr<Message>& _msg) : msg(_msg)
-{
-}
+DeliverableMessage::DeliverableMessage(const Message& _msg, TxBuffer* _txn) : msg(_msg), txn(_txn) {}
 
 void DeliverableMessage::deliverTo(const boost::shared_ptr<Queue>& queue)
 {
-    queue->deliver(msg);    
+    queue->deliver(msg, txn);
     delivered = true;
 }
 
 Message& DeliverableMessage::getMessage()
 {
-    return *msg;
+    return msg;
 }
 
-uint64_t DeliverableMessage::contentSize ()
+uint64_t DeliverableMessage::contentSize()
 {
-    return msg->contentSize ();
+    return msg.getContentSize();
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.h Fri Aug 10 12:04:27 2012
@@ -25,14 +25,15 @@
 #include "qpid/broker/Deliverable.h"
 #include "qpid/broker/Message.h"
 
-#include <boost/intrusive_ptr.hpp>
-
 namespace qpid {
     namespace broker {
-        class QPID_BROKER_CLASS_EXTERN DeliverableMessage : public Deliverable{
-            boost::intrusive_ptr<Message> msg;
+    class TxBuffer;
+        class QPID_BROKER_CLASS_EXTERN DeliverableMessage : public Deliverable
+        {
+            Message msg;
+            TxBuffer* txn;
         public:
-            QPID_BROKER_EXTERN DeliverableMessage(const boost::intrusive_ptr<Message>& msg);
+            QPID_BROKER_EXTERN DeliverableMessage(const Message& msg, TxBuffer* txn);
             QPID_BROKER_EXTERN virtual void deliverTo(const boost::shared_ptr<Queue>& queue);
             QPID_BROKER_EXTERN Message& getMessage();
             QPID_BROKER_EXTERN uint64_t contentSize();

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryAdapter.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryAdapter.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryAdapter.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryAdapter.h Fri Aug 10 12:04:27 2012
@@ -1,53 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _DeliveryAdapter_
-#define _DeliveryAdapter_
-
-#include "qpid/broker/DeliveryId.h"
-#include "qpid/broker/Message.h"
-#include "qpid/framing/amqp_types.h"
-
-namespace qpid {
-namespace broker {
-
-class DeliveryRecord;
-
-/**
- * The intention behind this interface is to separate the generic
- * handling of some form of message delivery to clients that is
- * contained in the version independent Channel class from the
- * details required for a particular situation or
- * version. i.e. where the existing adapters allow (through
- * supporting the generated interface for a version of the
- * protocol) inputs of a channel to be adapted to the version
- * independent part, this does the same for the outputs.
- */
-class DeliveryAdapter
-{
-  public:
-    virtual void deliver(DeliveryRecord&, bool sync) = 0;
-    virtual ~DeliveryAdapter(){}
-};
-
-}}
-
-
-#endif

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Fri Aug 10 12:04:27 2012
@@ -24,6 +24,7 @@
 #include "qpid/broker/Consumer.h"
 #include "qpid/broker/Exchange.h"
 #include "qpid/broker/Queue.h"
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
 #include "qpid/log/Statement.h"
 #include "qpid/framing/FrameHandler.h"
 #include "qpid/framing/MessageTransferBody.h"
@@ -32,77 +33,46 @@ using namespace qpid;
 using namespace qpid::broker;
 using std::string;
 
-DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg,
+DeliveryRecord::DeliveryRecord(const QueueCursor& _msg,
+                               framing::SequenceNumber _msgId,
                                const Queue::shared_ptr& _queue,
                                const std::string& _tag,
                                const boost::shared_ptr<Consumer>& _consumer,
                                bool _acquired,
                                bool accepted,
                                bool _windowing,
-                               uint32_t _credit):
-    msg(_msg),
-    queue(_queue),
-    tag(_tag),
-    consumer(_consumer),
-    acquired(_acquired),
-    acceptExpected(!accepted),
-    cancelled(false),
-    completed(false),
-    ended(accepted && acquired),
-    windowing(_windowing),
-    credit(msg.payload ? msg.payload->getRequiredCredit() : _credit)
+                               uint32_t _credit) : msg(_msg),
+                                                   queue(_queue),
+                                                   tag(_tag),
+                                                   consumer(_consumer),
+                                                   acquired(_acquired),
+                                                   acceptExpected(!accepted),
+                                                   cancelled(false),
+                                                   completed(false),
+                                                   ended(accepted && acquired),
+                                                   windowing(_windowing),
+                                                   credit(_credit),
+                                                   msgId(_msgId)
 {}
 
 bool DeliveryRecord::setEnded()
 {
     ended = true;
-    //reset msg pointer, don't need to hold on to it anymore
-    msg.payload = boost::intrusive_ptr<Message>();
     QPID_LOG(debug, "DeliveryRecord::setEnded() id=" << id);
     return isRedundant();
 }
 
-void DeliveryRecord::redeliver(SemanticState* const session) {
-    if (!ended) {
-        if(cancelled){
-            //if subscription was cancelled, requeue it (waiting for
-            //final confirmation for AMQP WG on this case)
-            requeue();
-        }else{
-            msg.payload->redeliver();//mark as redelivered
-            session->deliver(*this, false);
-        }
-    }
-}
-
-void DeliveryRecord::deliver(framing::FrameHandler& h, DeliveryId deliveryId, uint16_t framesize)
-{
-    id = deliveryId;
-    if (msg.payload->getRedelivered()){
-        msg.payload->setRedelivered();
-    }
-    msg.payload->adjustTtl();
-
-    framing::AMQFrame method((framing::MessageTransferBody(framing::ProtocolVersion(), tag, acceptExpected ? 0 : 1, acquired ? 0 : 1)));
-    method.setEof(false);
-    h.handle(method);
-    msg.payload->sendHeader(h, framesize);
-    msg.payload->sendContent(*queue, h, framesize);
-}
-
-void DeliveryRecord::requeue() const
+void DeliveryRecord::requeue()
 {
     if (acquired && !ended) {
-        msg.payload->redeliver();
-        queue->requeue(msg);
+        queue->release(msg);
     }
 }
 
 void DeliveryRecord::release(bool setRedelivered)
 {
     if (acquired && !ended) {
-        if (setRedelivered) msg.payload->redeliver();
-        queue->requeue(msg);
+        queue->release(msg, setRedelivered);
         acquired = false;
         setEnded();
     } else {
@@ -110,13 +80,14 @@ void DeliveryRecord::release(bool setRed
     }
 }
 
-void DeliveryRecord::complete()  {
+void DeliveryRecord::complete()
+{
     completed = true;
 }
 
 bool DeliveryRecord::accept(TransactionContext* ctxt) {
     if (!ended) {
-        if (consumer) consumer->acknowledged(getMessage());
+        if (consumer) consumer->acknowledged(*this);
         if (acquired) queue->dequeue(ctxt, msg);
         setEnded();
         QPID_LOG(debug, "Accepted " << id);
@@ -124,31 +95,22 @@ bool DeliveryRecord::accept(TransactionC
     return isRedundant();
 }
 
-void DeliveryRecord::dequeue(TransactionContext* ctxt) const{
+void DeliveryRecord::dequeue(TransactionContext* ctxt) const
+{
     if (acquired && !ended) {
         queue->dequeue(ctxt, msg);
     }
 }
 
-void DeliveryRecord::committed() const{
+void DeliveryRecord::committed() const
+{
     queue->dequeueCommitted(msg);
 }
 
 void DeliveryRecord::reject()
 {
     if (acquired && !ended) {
-        Exchange::shared_ptr alternate = queue->getAlternateExchange();
-        if (alternate) {
-            DeliverableMessage delivery(msg.payload);
-            alternate->routeWithAlternate(delivery);
-            QPID_LOG(info, "Routed rejected message from " << queue->getName() << " to "
-                     << alternate->getName());
-        } else {
-            //just drop it
-            QPID_LOG(info, "Dropping rejected message from " << queue->getName());
-        }
-        queue->countRejected();
-        dequeue();
+        queue->reject(msg);
         setEnded();
     }
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h Fri Aug 10 12:04:27 2012
@@ -26,15 +26,17 @@
 #include <deque>
 #include <vector>
 #include <ostream>
+#include "qpid/framing/FrameHandler.h"
 #include "qpid/framing/SequenceSet.h"
 #include "qpid/broker/BrokerImportExport.h"
-#include "qpid/broker/QueuedMessage.h"
+#include "qpid/broker/QueueCursor.h"
 #include "qpid/broker/DeliveryId.h"
 #include "qpid/broker/Message.h"
 
 namespace qpid {
 namespace broker {
 
+class Queue;
 class TransactionContext;
 class SemanticState;
 struct AckRange;
@@ -45,7 +47,7 @@ class Consumer;
  */
 class DeliveryRecord
 {
-    QueuedMessage msg;
+    QueueCursor msg;
     mutable boost::shared_ptr<Queue> queue;
     std::string tag;    // name of consumer
     boost::shared_ptr<Consumer> consumer;
@@ -65,9 +67,10 @@ class DeliveryRecord
      * after that).
      */
     uint32_t credit;
+    framing::SequenceNumber msgId;
 
   public:
-    QPID_BROKER_EXTERN DeliveryRecord(const QueuedMessage& msg,
+    QPID_BROKER_EXTERN DeliveryRecord(const QueueCursor& msgCursor, framing::SequenceNumber msgId,
                                       const boost::shared_ptr<Queue>& queue, 
                                       const std::string& tag,
                                       const boost::shared_ptr<Consumer>& consumer,
@@ -80,11 +83,10 @@ class DeliveryRecord
     bool coveredBy(const framing::SequenceSet* const range) const { return range->contains(id); }
 
     void dequeue(TransactionContext* ctxt = 0) const;
-    void requeue() const;
+    void requeue();
     void release(bool setRedelivered);
     void reject();
     void cancel(const std::string& tag);
-    void redeliver(SemanticState* const);
     void acquire(DeliveryIds& results);
     void complete();
     bool accept(TransactionContext* ctxt); // Returns isRedundant()
@@ -102,13 +104,13 @@ class DeliveryRecord
     uint32_t getCredit() const;
     const std::string& getTag() const { return tag; }
 
-    void deliver(framing::FrameHandler& h, DeliveryId deliveryId, uint16_t framesize);
     void setId(DeliveryId _id) { id = _id; }
 
     typedef std::deque<DeliveryRecord> DeliveryRecords;
     static AckRange findRange(DeliveryRecords& records, DeliveryId first, DeliveryId last);
-    const QueuedMessage& getMessage() const { return msg; }
+    const QueueCursor& getMessage() const { return msg; }
     framing::SequenceNumber getId() const { return id; }
+    framing::SequenceNumber getMessageId() const { return msgId; }
     boost::shared_ptr<Queue> getQueue() const { return queue; }
 
     friend std::ostream& operator<<(std::ostream&, const DeliveryRecord&);

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h Fri Aug 10 12:04:27 2012
@@ -40,7 +40,6 @@ class DtxAck : public TxOp{
     virtual void commit() throw();
     virtual void rollback() throw();
     virtual ~DtxAck(){}
-    virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); }
     const DeliveryRecords& getPending() const { return pending; }
 };
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp Fri Aug 10 12:04:27 2012
@@ -25,6 +25,7 @@
 #include "qpid/broker/ExchangeRegistry.h"
 #include "qpid/broker/FedOps.h"
 #include "qpid/broker/Queue.h"
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
 #include "qpid/framing/MessageProperties.h"
 #include "qpid/framing/reply_exceptions.h"
 #include "qpid/log/Statement.h"
@@ -62,10 +63,10 @@ Exchange::PreRoute::PreRoute(Deliverable
 
         if (parent->sequence){
             parent->sequenceNo++;
-            msg.getMessage().insertCustomProperty(qpidMsgSequence,parent->sequenceNo);
+            msg.getMessage().addAnnotation(qpidMsgSequence,parent->sequenceNo);
         }
         if (parent->ive) {
-            parent->lastMsg =  &( msg.getMessage());
+            parent->lastMsg = msg.getMessage();
         }
     }
 }
@@ -111,12 +112,6 @@ void Exchange::doRoute(Deliverable& msg,
     int count = 0;
 
     if (b.get()) {
-        // Block the content release if the message is transient AND there is more than one binding
-        if (!msg.getMessage().isPersistent() && b->size() > 1) {
-            msg.getMessage().blockContentRelease();
-        }
-
-
         ExInfo error(getName()); // Save exception to throw at the end.
         for(std::vector<Binding::shared_ptr>::const_iterator i = b->begin(); i != b->end(); i++, count++) {
             try {
@@ -161,8 +156,8 @@ void Exchange::doRoute(Deliverable& msg,
 }
 
 void Exchange::routeIVE(){
-    if (ive && lastMsg.get()){
-        DeliverableMessage dmsg(lastMsg);
+    if (ive && lastMsg){
+        DeliverableMessage dmsg(lastMsg, 0);
         route(dmsg);
     }
 }
@@ -400,9 +395,9 @@ bool Exchange::MatchQueue::operator()(Ex
     return b->queue == queue;
 }
 
-void Exchange::setProperties(const boost::intrusive_ptr<Message>& msg) {
-    msg->setExchange(getName());
-}
+//void Exchange::setProperties(Message& msg) {
+//    qpid::broker::amqp_0_10::MessageTransfer::setExchange(msg, getName());
+//}
 
 bool Exchange::routeWithAlternate(Deliverable& msg)
 {

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h Fri Aug 10 12:04:27 2012
@@ -25,6 +25,7 @@
 #include <boost/shared_ptr.hpp>
 #include "qpid/broker/BrokerImportExport.h"
 #include "qpid/broker/Deliverable.h"
+#include "qpid/broker/Message.h"
 #include "qpid/broker/MessageStore.h"
 #include "qpid/broker/PersistableExchange.h"
 #include "qpid/framing/FieldTable.h"
@@ -74,7 +75,7 @@ protected:
     mutable qpid::sys::Mutex sequenceLock;
     int64_t sequenceNo;
     bool ive;
-    boost::intrusive_ptr<Message> lastMsg;
+    Message lastMsg;
 
     class PreRoute{
     public:
@@ -196,7 +197,7 @@ public:
     virtual bool bind(boost::shared_ptr<Queue> queue, const std::string& routingKey, const qpid::framing::FieldTable* args) = 0;
     virtual bool unbind(boost::shared_ptr<Queue> queue, const std::string& routingKey, const qpid::framing::FieldTable* args) = 0;
     virtual bool isBound(boost::shared_ptr<Queue> queue, const std::string* const routingKey, const qpid::framing::FieldTable* const args) = 0;
-    QPID_BROKER_EXTERN virtual void setProperties(const boost::intrusive_ptr<Message>&);
+    //QPID_BROKER_EXTERN virtual void setProperties(Message&);
     virtual void route(Deliverable& msg) = 0;
 
     //PersistableExchange:

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp Fri Aug 10 12:04:27 2012
@@ -27,7 +27,7 @@ namespace broker {
 
 ExpiryPolicy::~ExpiryPolicy() {}
 
-bool ExpiryPolicy::hasExpired(Message& m) {
+bool ExpiryPolicy::hasExpired(const Message& m) {
     return m.getExpiration() < sys::AbsTime::now();
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.h Fri Aug 10 12:04:27 2012
@@ -42,7 +42,7 @@ class QPID_BROKER_CLASS_EXTERN ExpiryPol
 {
   public:
     QPID_BROKER_EXTERN virtual ~ExpiryPolicy();
-    QPID_BROKER_EXTERN virtual bool hasExpired(Message&);
+    QPID_BROKER_EXTERN virtual bool hasExpired(const Message&);
     QPID_BROKER_EXTERN virtual qpid::sys::AbsTime getCurrentTime();
 };
 }} // namespace qpid::broker

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Fairshare.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Fairshare.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Fairshare.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Fairshare.cpp Fri Aug 10 12:04:27 2012
@@ -19,7 +19,8 @@
  *
  */
 #include "qpid/broker/Fairshare.h"
-#include "qpid/broker/QueuedMessage.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/QueueSettings.h"
 #include "qpid/framing/FieldTable.h"
 #include "qpid/framing/FieldValue.h"
 #include "qpid/log/Statement.h"
@@ -83,17 +84,6 @@ bool Fairshare::setState(uint p, uint c)
     return true;
 }
 
-bool Fairshare::findFrontLevel(uint& p, PriorityLevels& messages)
-{
-    const uint start = p = currentLevel();
-    do {
-        if (!messages[p].empty()) return true;
-    } while ((p = nextLevel()) != start);
-    return false;
-}
-
-
-
 bool Fairshare::getState(const Messages& m, uint& priority, uint& count)
 {
     const Fairshare* fairshare = dynamic_cast<const Fairshare*>(&m);
@@ -106,82 +96,30 @@ bool Fairshare::setState(Messages& m, ui
     return fairshare && fairshare->setState(priority, count);
 }
 
-int getIntegerSetting(const qpid::framing::FieldTable& settings, const std::vector<std::string>& keys)
+PriorityQueue::Priority Fairshare::firstLevel()
 {
-    qpid::framing::FieldTable::ValuePtr v;
-    std::vector<std::string>::const_iterator i = keys.begin(); 
-    while (!v && i != keys.end()) {
-        v = settings.get(*i++);
-    }
-
-    if (!v) {
-        return 0;
-    } else if (v->convertsTo<int>()) {
-        return v->get<int>();
-    } else if (v->convertsTo<std::string>()){
-        std::string s = v->get<std::string>();
-        try {
-            return boost::lexical_cast<int>(s);
-        } catch(const boost::bad_lexical_cast&) {
-            QPID_LOG(warning, "Ignoring invalid integer value for " << *i << ": " << s);
-            return 0;
-        }
-    } else {
-        QPID_LOG(warning, "Ignoring invalid integer value for " << *i << ": " << *v);
-        return 0;
-    }
+    return Priority(currentLevel());
 }
 
-int getIntegerSettingForKey(const qpid::framing::FieldTable& settings, const std::string& key)
+bool Fairshare::nextLevel(Priority& p)
 {
-    return getIntegerSetting(settings, boost::assign::list_of<std::string>(key));
-}
-
-int getSetting(const qpid::framing::FieldTable& settings, const std::vector<std::string>& keys, int minvalue, int maxvalue)
-{
-    return std::max(minvalue,std::min(getIntegerSetting(settings, keys), maxvalue));
-}
-
-std::auto_ptr<Fairshare> getFairshareForKey(const qpid::framing::FieldTable& settings, uint levels, const std::string& key)
-{
-    uint defaultLimit = getIntegerSettingForKey(settings, key);
-    std::auto_ptr<Fairshare> fairshare(new Fairshare(levels, defaultLimit));
-    for (uint i = 0; i < levels; i++) {
-        std::string levelKey = (boost::format("%1%-%2%") % key % i).str();
-        if(settings.isSet(levelKey)) {
-            fairshare->setLimit(i, getIntegerSettingForKey(settings, levelKey));
-        }
-    }
-    if (!fairshare->isNull()) {
-        return fairshare;
+    int next = nextLevel();
+    if (next == p.start) {
+        return false;
     } else {
-        return std::auto_ptr<Fairshare>();
-    }
-}
-
-std::auto_ptr<Fairshare> getFairshare(const qpid::framing::FieldTable& settings,
-                                      uint levels,
-                                      const std::vector<std::string>& keys)
-{
-    std::auto_ptr<Fairshare> fairshare;
-    for (std::vector<std::string>::const_iterator i = keys.begin(); i != keys.end() && !fairshare.get(); ++i) {
-        fairshare = getFairshareForKey(settings, levels, *i);
+        p.current = next;
+        return true;
     }
-    return fairshare;
 }
 
-std::auto_ptr<Messages> Fairshare::create(const qpid::framing::FieldTable& settings)
+std::auto_ptr<Messages> Fairshare::create(const QueueSettings& settings)
 {
-    using boost::assign::list_of;
-    std::auto_ptr<Messages> result;
-    size_t levels = getSetting(settings, list_of<std::string>("qpid.priorities")("x-qpid-priorities"), 0, 100);
-    if (levels) {
-        std::auto_ptr<Fairshare> fairshare =
-            getFairshare(settings, levels, list_of<std::string>("qpid.fairshare")("x-qpid-fairshare"));
-        if (fairshare.get()) result = fairshare;
-        else result = std::auto_ptr<Messages>(new PriorityQueue(levels));
+    std::auto_ptr<Fairshare> fairshare(new Fairshare(settings.priorities, settings.defaultFairshare));
+    for (uint i = 0; i < settings.priorities; i++) {
+        std::map<uint32_t,uint32_t>::const_iterator l = settings.fairshare.find(i);
+        if (l != settings.fairshare.end()) fairshare->setLimit(i, l->second);
     }
-    return result;
+    return std::auto_ptr<Messages>(fairshare.release());
 }
 
 }} // namespace qpid::broker

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Fairshare.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Fairshare.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Fairshare.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Fairshare.h Fri Aug 10 12:04:27 2012
@@ -24,13 +24,11 @@
 #include "qpid/broker/PriorityQueue.h"
 
 namespace qpid {
-namespace framing {
-class FieldTable;
-}
 namespace broker {
+struct QueueSettings;
 
 /**
- * Modifies a basic prioirty queue by limiting the number of messages
+ * Modifies a basic priority queue by limiting the number of messages
  * from each priority level that are dispatched before allowing
  * dispatch from the next level.
  */
@@ -42,7 +40,7 @@ class Fairshare : public PriorityQueue
     bool setState(uint priority, uint count);
     void setLimit(size_t level, uint limit);
     bool isNull();
-    static std::auto_ptr<Messages> create(const qpid::framing::FieldTable& settings);
+    static std::auto_ptr<Messages> create(const QueueSettings& settings);
     static bool getState(const Messages&, uint& priority, uint& count);
     static bool setState(Messages&, uint priority, uint count);
   private:
@@ -54,7 +52,8 @@ class Fairshare : public PriorityQueue
     uint currentLevel();
     uint nextLevel();
     bool limitReached();
-    bool findFrontLevel(uint& p, PriorityLevels&);
+    Priority firstLevel();
+    bool nextLevel(Priority& );
 };
 }} // namespace qpid::broker
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/FifoDistributor.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/FifoDistributor.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/FifoDistributor.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/FifoDistributor.cpp Fri Aug 10 12:04:27 2012
@@ -28,21 +28,14 @@ using namespace qpid::broker;
 FifoDistributor::FifoDistributor(Messages& container)
     : messages(container) {}
 
-bool FifoDistributor::nextConsumableMessage( Consumer::shared_ptr&, QueuedMessage& next )
+bool FifoDistributor::acquire(const std::string&, Message& msg)
 {
-    return messages.consume(next);
-}
-
-bool FifoDistributor::allocate(const std::string&, const QueuedMessage& )
-{
-    // by default, all messages present on the queue may be allocated as they have yet to
-    // be acquired.
-    return true;
-}
-
-bool FifoDistributor::nextBrowsableMessage( Consumer::shared_ptr& c, QueuedMessage& next )
-{
-    return messages.browse(c->getPosition(), next, !c->browseAcquired());
+    if (msg.getState() == AVAILABLE) {
+        msg.setState(ACQUIRED);
+        return true;
+    } else {
+        return false;
+    }
 }
 
 void FifoDistributor::query(qpid::types::Variant::Map&) const

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/FifoDistributor.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/FifoDistributor.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/FifoDistributor.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/FifoDistributor.h Fri Aug 10 12:04:27 2012
@@ -38,15 +38,7 @@ class FifoDistributor : public MessageDi
  public:
     FifoDistributor(Messages& container);
 
-    /** Locking Note: all methods assume the caller is holding the Queue::messageLock
-     * during the method call.
-     */
-
-    /** MessageDistributor interface */
-
-    bool nextConsumableMessage( Consumer::shared_ptr& consumer, QueuedMessage& next );
-    bool allocate(const std::string& consumer, const QueuedMessage& target);
-    bool nextBrowsableMessage( Consumer::shared_ptr& consumer, QueuedMessage& next );
+    bool acquire(const std::string& consumer, Message& target);
     void query(qpid::types::Variant::Map&) const;
 
  private:

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp Fri Aug 10 12:04:27 2012
@@ -19,6 +19,7 @@
  *
  */
 #include "qpid/broker/HeadersExchange.h"
+#include "qpid/broker/MapHandler.h"
 #include "qpid/framing/FieldValue.h"
 #include "qpid/framing/reply_exceptions.h"
 #include "qpid/log/Statement.h"
@@ -55,6 +56,100 @@ namespace {
     const std::string fedOpUnbind("U");
     const std::string fedOpReorigin("R");
     const std::string fedOpHello("H");
+
+std::string getMatch(const FieldTable* args)
+{
+    if (!args) {
+        throw InternalErrorException(QPID_MSG("No arguments given."));
+    }
+    FieldTable::ValuePtr what = args->get(x_match);
+    if (!what) {
+        return empty;
+    }
+    if (!what->convertsTo<std::string>()) {
+        throw InternalErrorException(QPID_MSG("Invalid x-match binding format to headers exchange. Must be a string [\"all\" or \"any\"]"));
+    }
+    return what->get<std::string>();
+}
+class Matcher : public MapHandler
+{
+  public:
+    Matcher(const FieldTable& b) : binding(b), matched(0) {}
+    void handleUint8(const MapHandler::CharSequence& key, uint8_t value) { processUint(std::string(key.data, key.size), value); }
+    void handleUint16(const MapHandler::CharSequence& key, uint16_t value) { processUint(std::string(key.data, key.size), value); }
+    void handleUint32(const MapHandler::CharSequence& key, uint32_t value) { processUint(std::string(key.data, key.size), value); }
+    void handleUint64(const MapHandler::CharSequence& key, uint64_t value) { processUint(std::string(key.data, key.size), value); }
+    void handleInt8(const MapHandler::CharSequence& key, int8_t value) { processInt(std::string(key.data, key.size), value); }
+    void handleInt16(const MapHandler::CharSequence& key, int16_t value) { processInt(std::string(key.data, key.size), value); }
+    void handleInt32(const MapHandler::CharSequence& key, int32_t value) { processInt(std::string(key.data, key.size), value); }
+    void handleInt64(const MapHandler::CharSequence& key, int64_t value) { processInt(std::string(key.data, key.size), value); }
+    void handleFloat(const MapHandler::CharSequence& key, float value) { processFloat(std::string(key.data, key.size), value); }
+    void handleDouble(const MapHandler::CharSequence& key, double value) { processFloat(std::string(key.data, key.size), value); }
+    void handleString(const MapHandler::CharSequence& key, const MapHandler::CharSequence& value, const MapHandler::CharSequence& /*encoding*/)
+    {
+        processString(std::string(key.data, key.size), std::string(value.data, value.size));
+    }
+    void handleVoid(const MapHandler::CharSequence& key)
+    {
+        valueCheckRequired(std::string(key.data, key.size));
+    }
+    bool matches()
+    {
+        std::string what = getMatch(&binding);
+        if (what == all) {
+            //must match all entries in the binding, except the match mode indicator
+            return matched == binding.size() - 1;
+        } else if (what == any) {
+            //match any of the entries in the binding
+            return matched > 0;
+        } else {
+            return false;
+        }
+    }
+  private:
+    bool valueCheckRequired(const std::string& key)
+    {
+        FieldTable::ValuePtr v = binding.get(key);
+        if (v) {
+            if (v->getType() == 0xf0/*VOID*/) {
+                ++matched;
+                return false;
+            } else {
+                return true;
+            }
+        } else {
+            return false;
+        }
+    }
+
+    void processString(const std::string& key, const std::string& actual)
+    {
+        if (valueCheckRequired(key) && binding.getAsString(key) == actual) {
+            ++matched;
+        }
+    }
+    void processFloat(const std::string& key, double actual)
+    {
+        double bound;
+        if (valueCheckRequired(key) && binding.getDouble(key, bound) && bound == actual) {
+            ++matched;
+        }
+    }
+    void processInt(const std::string& key, int64_t actual)
+    {
+        if (valueCheckRequired(key) && binding.getAsInt64(key) == actual) {
+            ++matched;
+        }
+    }
+    void processUint(const std::string& key, uint64_t actual)
+    {
+        if (valueCheckRequired(key) && binding.getAsUInt64(key) == actual) {
+            ++matched;
+        }
+    }
+    const FieldTable& binding;
+    size_t matched;
+};
 }
 
 HeadersExchange::HeadersExchange(const string& _name, Manageable* _parent, Broker* b) :
@@ -72,21 +167,6 @@ HeadersExchange::HeadersExchange(const s
         mgmtExchange->set_type (typeName);
 }
 
-std::string HeadersExchange::getMatch(const FieldTable* args)
-{
-    if (!args) {
-        throw InternalErrorException(QPID_MSG("No arguments given."));
-    }
-    FieldTable::ValuePtr what = args->get(x_match);
-    if (!what) {
-        return empty;
-    }
-    if (!what->convertsTo<std::string>()) {
-        throw InternalErrorException(QPID_MSG("Invalid x-match binding format to headers exchange. Must be a string [\"all\" or \"any\"]"));
-    }
-    return what->get<std::string>();
-}
-
 bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, const FieldTable* args)
 {
     string fedOp(fedOpBind);
@@ -196,28 +276,16 @@ bool HeadersExchange::unbind(Queue::shar
 
 void HeadersExchange::route(Deliverable& msg)
 {
-    const FieldTable* args = msg.getMessage().getApplicationHeaders();
-    if (!args) {
-        //can't match if there were no headers passed in
-        if (mgmtExchange != 0) {
-            mgmtExchange->inc_msgReceives();
-            mgmtExchange->inc_byteReceives(msg.contentSize());
-            mgmtExchange->inc_msgDrops();
-            mgmtExchange->inc_byteDrops(msg.contentSize());
-            if (brokerMgmtObject)
-                brokerMgmtObject->inc_discardsNoRoute();
-        }
-        return;
-    }
-
     PreRoute pr(msg, this);
 
     BindingList b(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >);
     Bindings::ConstPtr p = bindings.snapshot();
     if (p.get()) {
         for (std::vector<BoundKey>::const_iterator i = p->begin(); i != p->end(); ++i) {
-            if (match((*i).binding->args, *args)) {
-                b->push_back((*i).binding);
+            Matcher matcher(i->binding->args);
+            msg.getMessage().processProperties(matcher);
+            if (matcher.matches()) {
+                b->push_back(i->binding);
             }
         }
     }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h Fri Aug 10 12:04:27 2012
@@ -73,9 +73,6 @@ class HeadersExchange : public virtual E
 
     Bindings bindings;
     qpid::sys::Mutex lock;
-
-    static std::string getMatch(const framing::FieldTable* args);
-
   protected:
     void getNonFedArgs(const framing::FieldTable* args,
                        framing::FieldTable& nonFedArgs);

Added: qpid/trunk/qpid/cpp/src/qpid/broker/IndexedDeque.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/IndexedDeque.h?rev=1371676&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/IndexedDeque.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/IndexedDeque.h Fri Aug 10 12:04:27 2012
@@ -0,0 +1,226 @@
+#ifndef QPID_BROKER_INDEXEDDEQUE_H
+#define QPID_BROKER_INDEXEDDEQUE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/framing/SequenceNumber.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/Messages.h"
+#include "qpid/broker/QueueCursor.h"
+#include "qpid/log/Statement.h"
+#include <deque>
+
+namespace qpid {
+namespace broker {
+
+/**
+ * Template for a deque whose contents can be refered to by
+ * QueueCursor
+ */
+template <typename T> class IndexedDeque
+{
+  public:
+    typedef boost::function1<T, qpid::framing::SequenceNumber> Padding;
+    IndexedDeque(Padding p) : head(0), version(0), padding(p) {}
+
+    bool index(const QueueCursor& cursor, size_t& result)
+    {
+        return cursor.valid && index(qpid::framing::SequenceNumber(cursor.position + 1), result);
+    }
+
+    /**
+     * Finds the index for the message with the specified sequence number.
+     *
+     * @returns true if a message was found with the specified sequence,
+     * in which case the second parameter will be set to the index of that
+     * message; false if no message with that sequence exists, in which
+     * case the second parameter will be 0 if the sequence is less than
+     * that of the first message and non-zero if it is greater than that
+     * of the last message
+     */
+    bool index(const qpid::framing::SequenceNumber& position, size_t& i)
+    {
+        //assuming a monotonic sequence, with no messages removed except
+        //from the ends of the deque, we can use the position to determine
+        //an index into the deque
+        if (messages.size()) {
+            qpid::framing::SequenceNumber front(messages.front().getSequence());
+            if (position < front) {
+                i = 0;
+            } else {
+                i = position - front;
+                return i < messages.size();
+            }
+        }
+        return false;
+    }
+
+    bool deleted(const QueueCursor& cursor)
+    {
+        size_t i;
+        if (cursor.valid && index(cursor.position, i)) {
+            messages[i].setState(DELETED);
+            clean();
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    T& publish(const T& added)
+    {
+        // QPID-4046: let producer help clean the backlog of deleted messages
+        clean();
+        //for ha replication, the queue can sometimes be reset by
+        //removing some of the more recent messages, in this case we
+        //need to ensure the DELETED records at the tail do not interfere with indexing
+        while (messages.size() && added.getSequence() <= messages.back().getSequence() && messages.back().getState() == DELETED)
+            messages.pop_back();
+        if (messages.size() && added.getSequence() <= messages.back().getSequence()) throw qpid::Exception(QPID_MSG("Index out of sequence!"));
+
+        //add padding to prevent gaps in sequence, which break the index
+        //calculation (needed for queue replication)
+        while (messages.size() && (added.getSequence() - messages.back().getSequence()) > 1)
+            messages.push_back(padding(messages.back().getSequence() + 1));
+
+        messages.push_back(added);
+        T& m = messages.back();
+        m.setState(AVAILABLE);
+        if (head >= messages.size()) head = messages.size() - 1;
+        QPID_LOG(debug, "Message " << &m << " published, state is " << m.getState() << " (head is now " << head << ")");
+        return m;
+    }
+
+    T* release(const QueueCursor& cursor)
+    {
+        size_t i;
+        if (cursor.valid && index(cursor.position, i)) {
+            messages[i].setState(AVAILABLE);
+            ++version;
+            QPID_LOG(debug, "Released message at position " << cursor.position << ", index " << i);
+            return &messages[i];
+        } else {
+            if (!cursor.valid) { QPID_LOG(debug, "Could not release message; cursor was invalid");}
+            else { QPID_LOG(debug, "Could not release message at position " << cursor.position); }
+            return 0;
+        }
+    }
+
+    bool reset(const QueueCursor& cursor)
+    {
+        return !cursor.valid || (cursor.type == CONSUMER && cursor.version != version);
+    }
+
+    T* next(QueueCursor& cursor)
+    {
+        size_t i;
+        if (reset(cursor)) i = head; //start from head
+        else index(cursor, i); //get first message that is greater than position
+
+        if (cursor.valid) {
+            QPID_LOG(debug, "next() called for cursor at " << cursor.position << ", index set to " << i << " (of " << messages.size() << ")");
+        } else {
+            QPID_LOG(debug, "next() called for invalid cursor, index started at " << i << " (of " << messages.size() << ")");
+        }
+        while (i < messages.size()) {
+            T& m = messages[i++];
+            if (m.getState() == DELETED) continue;
+            cursor.setPosition(m.getSequence(), version);
+            QPID_LOG(debug, "in next(), cursor set to " << cursor.position);
+
+            if (cursor.check(m)) {
+                QPID_LOG(debug, "in next(), returning message at " << cursor.position);
+                return &m;
+            }
+        }
+        QPID_LOG(debug, "no message to return from next");
+        return 0;
+    }
+
+    size_t size()
+    {
+        size_t count(0);
+        for (size_t i = head; i < messages.size(); ++i) {
+            if (messages[i].getState() == AVAILABLE) ++count;
+        }
+        return count;
+    }
+
+    T* find(const qpid::framing::SequenceNumber& position, QueueCursor* cursor)
+    {
+        size_t i;
+        if (index(position, i)){
+            T& m = messages[i];
+            if (cursor) cursor->setPosition(position, version);
+            if (m.getState() == AVAILABLE || m.getState() == ACQUIRED) {
+                return &m;
+            }
+        } else if (cursor) {
+            if (i >= messages.size()) cursor->setPosition(position, version);//haven't yet got a message with that seq no
+            else if (i == 0) cursor->valid = false;//reset
+        }
+        return 0;
+    }
+
+    T* find(const QueueCursor& cursor)
+    {
+        if (cursor.valid) return find(cursor.position, 0);
+        else return 0;
+    }
+
+    void clean()
+    {
+        // QPID-4046: If a queue has multiple consumers, then it is possible for a large
+        // collection of deleted messages to build up.  Limit the number of messages cleaned
+        // up on each call to clean().
+        size_t count = 0;
+        while (messages.size() && messages.front().getState() == DELETED && count < 10) {
+            messages.pop_front();
+            count += 1;
+        }
+        head = (head > count) ? head - count : 0;
+        QPID_LOG(debug, "clean(): " << messages.size() << " messages remain; head is now " << head);
+    }
+
+    void foreach(Messages::Functor f)
+    {
+        for (typename Deque::iterator i = messages.begin(); i != messages.end(); ++i) {
+            if (i->getState() == AVAILABLE) {
+                f(*i);
+            }
+        }
+        clean();
+    }
+
+    void resetCursors()
+    {
+        ++version;
+    }
+
+    typedef std::deque<T> Deque;
+    Deque messages;
+    size_t head;
+    int32_t version;
+    Padding padding;
+};
+}} // namespace qpid::broker
+
+#endif  /*!QPID_BROKER_INDEXEDDEQUE_H*/

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp Fri Aug 10 12:04:27 2012
@@ -1,127 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "qpid/broker/LegacyLVQ.h"
-#include "qpid/broker/Broker.h"
-#include "qpid/broker/QueuedMessage.h"
-
-namespace qpid {
-namespace broker {
-
-LegacyLVQ::LegacyLVQ(const std::string& k, bool b, Broker* br) : MessageMap(k), noBrowse(b), broker(br) {}
-
-void LegacyLVQ::setNoBrowse(bool b)
-{
-    noBrowse = b;
-}
-bool LegacyLVQ::deleted(const QueuedMessage& message)
-{
-    Ordering::iterator i = messages.find(message.position);
-    if (i != messages.end() && i->second.payload == message.payload) {
-        erase(i);
-        return true;
-    } else {
-        return false;
-    }
-}
-
-bool LegacyLVQ::acquire(const framing::SequenceNumber& position, QueuedMessage& message)
-{
-    Ordering::iterator i = messages.find(position);
-    if (i != messages.end() && i->second.payload == message.payload && i->second.status == QueuedMessage::AVAILABLE) {
-        i->second.status = QueuedMessage::ACQUIRED;
-        message = i->second;
-        return true;
-    } else {
-        return false;
-    }
-}
-
-bool LegacyLVQ::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool unacquired)
-{
-    if (MessageMap::browse(position, message, unacquired)) {
-        if (!noBrowse) index.erase(getKey(message));
-        return true;
-    } else {
-        return false;
-    }
-}
-
-bool LegacyLVQ::push(const QueuedMessage& added, QueuedMessage& removed)
-{
-    //Hack to disable LVQ behaviour on cluster update:
-    if (broker && broker->isClusterUpdatee()) {
-        messages[added.position] = added;
-        return false;
-    } else {
-        return MessageMap::push(added, removed);
-    }
-}
-
-const QueuedMessage& LegacyLVQ::replace(const QueuedMessage& original, const QueuedMessage& update)
-{
-    //add the new message into the original position of the replaced message
-    Ordering::iterator i = messages.find(original.position);
-    if (i != messages.end()) {
-        i->second = update;
-        i->second.position = original.position;
-        return i->second;
-    } else {
-        QPID_LOG(error, "Failed to replace message at " << original.position);
-        return update;
-    }
-}
-
-void LegacyLVQ::removeIf(Predicate p)
-{
-    //Note: This method is currently called periodically on the timer
-    //thread to expire messages. In a clustered broker this means that
-    //the purging does not occur on the cluster event dispatch thread
-    //and consequently that is not totally ordered w.r.t other events
-    //(including publication of messages). The cluster does ensure
-    //that the actual expiration of messages (as distinct from the
-    //removing of those expired messages from the queue) *is*
-    //consistently ordered w.r.t. cluster events. This means that
-    //delivery of messages is in general consistent across the cluster
-    //inspite of any non-determinism in the triggering of a
-    //purge. However at present purging a last value queue (of the
-    //legacy sort) could potentially cause inconsistencies in the
-    //cluster (as the order w.r.t publications can affect the order in
-    //which messages appear in the queue). Consequently periodic
-    //purging of an LVQ is not enabled if the broker is clustered
-    //(expired messages will be removed on delivery and consolidated
-    //by key as part of normal LVQ operation).
-    if (!broker || !broker->isInCluster())
-        MessageMap::removeIf(p);
-}
-
-std::auto_ptr<Messages> LegacyLVQ::updateOrReplace(std::auto_ptr<Messages> current, 
-                                                   const std::string& key, bool noBrowse, Broker* broker)
-{
-    LegacyLVQ* lvq = dynamic_cast<LegacyLVQ*>(current.get());
-    if (lvq) { 
-        lvq->setNoBrowse(noBrowse);
-        return current;
-    } else {
-        return std::auto_ptr<Messages>(new LegacyLVQ(key, noBrowse, broker));
-    }
-}
-
-}} // namespace qpid::broker

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/LegacyLVQ.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/LegacyLVQ.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/LegacyLVQ.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/LegacyLVQ.h Fri Aug 10 12:04:27 2012
@@ -1,60 +0,0 @@
-#ifndef QPID_BROKER_LEGACYLVQ_H
-#define QPID_BROKER_LEGACYLVQ_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "qpid/broker/MessageMap.h"
-#include <memory>
-
-namespace qpid {
-namespace broker {
-class Broker;
-
-/**
- * This class encapsulates the behaviour of the old style LVQ where a
- * message replacing another messages for the given key will use the
- * position in the queue of the previous message. This however causes
- * problems for browsing. Either browsers stop the coalescing of
- * messages by key (default) or they may mis updates (if the no-browse
- * option is specified).
- */
-class LegacyLVQ : public MessageMap
-{
-  public:
-    LegacyLVQ(const std::string& key, bool noBrowse = false, Broker* broker = 0);
-    bool deleted(const QueuedMessage&);
-    bool acquire(const framing::SequenceNumber&, QueuedMessage&);
-    bool browse(const framing::SequenceNumber&, QueuedMessage&, bool);
-    bool push(const QueuedMessage& added, QueuedMessage& removed);
-    void removeIf(Predicate);
-    void setNoBrowse(bool);
-    static std::auto_ptr<Messages> updateOrReplace(std::auto_ptr<Messages> current, 
-                                                   const std::string& key, bool noBrowse,
-                                                   Broker* broker);
-  protected:
-    bool noBrowse;
-    Broker* broker;
-
-    const QueuedMessage& replace(const QueuedMessage&, const QueuedMessage&);
-};
-}} // namespace qpid::broker
-
-#endif  /*!QPID_BROKER_LEGACYLVQ_H*/

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp Fri Aug 10 12:04:27 2012
@@ -92,10 +92,10 @@ public:
 
     // Process messages sent from the remote's amq.failover exchange by extracting the failover URLs
     // and saving them should the Link need to reconnect.
-    void route(broker::Deliverable& msg)
+    void route(broker::Deliverable& /*msg*/)
     {
         if (!link) return;
-        const framing::FieldTable* headers = msg.getMessage().getApplicationHeaders();
+        const framing::FieldTable* headers = 0;//TODO: msg.getMessage().getApplicationHeaders();
         framing::Array addresses;
         if (headers && headers->getArray(FAILOVER_HEADER_KEY, addresses)) {
             // convert the Array of addresses to a single Url container for used with setUrl():

Added: qpid/trunk/qpid/cpp/src/qpid/broker/LossyQueue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/LossyQueue.cpp?rev=1371676&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/LossyQueue.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/LossyQueue.cpp Fri Aug 10 12:04:27 2012
@@ -0,0 +1,86 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "LossyQueue.h"
+#include "QueueDepth.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace broker {
+
+namespace {
+bool isLowerPriorityThan(uint8_t priority, const Message& m)
+{
+    return m.getPriority() <= priority;
+}
+}
+
+LossyQueue::LossyQueue(const std::string& n, const QueueSettings& s, MessageStore* const ms, management::Manageable* p, Broker* b)
+    : Queue(n, s, ms, p, b) {}
+
+bool LossyQueue::checkDepth(const QueueDepth& increment, const Message& message)
+{
+    if (increment.getSize() > settings.maxDepth.getSize()) {
+        if (mgmtObject) {
+            mgmtObject->inc_discardsOverflow();
+            if (brokerMgmtObject)
+                brokerMgmtObject->inc_discardsOverflow();
+        }
+        throw qpid::framing::ResourceLimitExceededException(QPID_MSG("Message larger than configured maximum depth on "
+                                                                     << name << ": size=" << increment.getSize() << ", max-size=" << settings.maxDepth.getSize()));
+    }
+
+    while (settings.maxDepth && (current + increment > settings.maxDepth)) {
+        QPID_LOG(debug, "purging " << name << ": current depth is [" << current << "], max depth is [" << settings.maxDepth << "], new message has size " << increment.getSize());
+        qpid::sys::Mutex::ScopedUnlock u(messageLock);
+        //TODO: arguably we should try and purge expired messages first but that is potentially expensive
+        if (remove(1, settings.priorities ? boost::bind(&isLowerPriorityThan, message.getPriority(), _1) : MessagePredicate(), MessageFunctor(), PURGE)) {
+            if (mgmtObject) {
+                mgmtObject->inc_discardsRing(1);
+                if (brokerMgmtObject)
+                    brokerMgmtObject->inc_discardsRing(1);
+            }
+        } else {
+            //should only be the case for a non-empty queue if we are
+            //testing priority and there was no lower (or equal)
+            //priority message available to purge
+            break;
+        }
+    }
+    if (settings.maxDepth && (current + increment > settings.maxDepth)) {
+        //will only be the case where we were unable to purge another
+        //message, which should only be the case if we are purging
+        //based on priority and there was no message with a lower (or
+        //equal) priority than this one, meaning that we drop this
+        //current message
+        if (mgmtObject) {
+            mgmtObject->inc_discardsRing(1);
+            if (brokerMgmtObject)
+                brokerMgmtObject->inc_discardsRing(1);
+        }
+        return false;
+    } else {
+        //have sufficient space for this message
+        current += increment;
+        return true;
+    }
+}
+}} // namespace qpid::broker



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org