You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2011/02/17 15:08:18 UTC

svn commit: r1071615 [1/3] - in /qpid/branches/qpid-2935/qpid: ./ bin/ cpp/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2/examples/python/ cpp/bindings/qmf2/examples/ruby/ cpp/bindings/qmf2/python/ cpp/bindings/qmf2/ruby/ cpp/bindings/qpid/ cpp/bin...

Author: kgiusti
Date: Thu Feb 17 14:08:14 2011
New Revision: 1071615

URL: http://svn.apache.org/viewvc?rev=1071615&view=rev
Log:
QPID-2935: sync with latest trunk

Added:
    qpid/branches/qpid-2935/qpid/cpp/bindings/qmf2/examples/cpp/print_events.cpp
      - copied unchanged from r1071383, qpid/trunk/qpid/cpp/bindings/qmf2/examples/cpp/print_events.cpp
    qpid/branches/qpid-2935/qpid/cpp/bindings/qpid/dotnet/Makefile.am
      - copied unchanged from r1071383, qpid/trunk/qpid/cpp/bindings/qpid/dotnet/Makefile.am
    qpid/branches/qpid-2935/qpid/cpp/build-aux/.gitignore
      - copied unchanged from r1071383, qpid/trunk/qpid/cpp/build-aux/.gitignore
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Fairshare.cpp
      - copied unchanged from r1071383, qpid/trunk/qpid/cpp/src/qpid/broker/Fairshare.cpp
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Fairshare.h
      - copied unchanged from r1071383, qpid/trunk/qpid/cpp/src/qpid/broker/Fairshare.h
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp
      - copied unchanged from r1071383, qpid/trunk/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/LegacyLVQ.h
      - copied unchanged from r1071383, qpid/trunk/qpid/cpp/src/qpid/broker/LegacyLVQ.h
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/MessageDeque.cpp
      - copied unchanged from r1071383, qpid/trunk/qpid/cpp/src/qpid/broker/MessageDeque.cpp
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/MessageDeque.h
      - copied unchanged from r1071383, qpid/trunk/qpid/cpp/src/qpid/broker/MessageDeque.h
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/MessageMap.cpp
      - copied unchanged from r1071383, qpid/trunk/qpid/cpp/src/qpid/broker/MessageMap.cpp
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/MessageMap.h
      - copied unchanged from r1071383, qpid/trunk/qpid/cpp/src/qpid/broker/MessageMap.h
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Messages.h
      - copied unchanged from r1071383, qpid/trunk/qpid/cpp/src/qpid/broker/Messages.h
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/PriorityQueue.cpp
      - copied unchanged from r1071383, qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.cpp
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/PriorityQueue.h
      - copied unchanged from r1071383, qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.h
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueObserver.h
      - copied unchanged from r1071383, qpid/trunk/qpid/cpp/src/qpid/broker/QueueObserver.h
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/ThresholdAlerts.cpp
      - copied unchanged from r1071383, qpid/trunk/qpid/cpp/src/qpid/broker/ThresholdAlerts.cpp
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/ThresholdAlerts.h
      - copied unchanged from r1071383, qpid/trunk/qpid/cpp/src/qpid/broker/ThresholdAlerts.h
    qpid/branches/qpid-2935/qpid/tests/src/py/qpid_tests/broker_0_10/extensions.py
      - copied unchanged from r1071383, qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/extensions.py
    qpid/branches/qpid-2935/qpid/tests/src/py/qpid_tests/broker_0_10/lvq.py
      - copied unchanged from r1071383, qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/lvq.py
    qpid/branches/qpid-2935/qpid/tests/src/py/qpid_tests/broker_0_10/priority.py
      - copied unchanged from r1071383, qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/priority.py
    qpid/branches/qpid-2935/qpid/tests/src/py/qpid_tests/broker_0_10/threshold.py
      - copied unchanged from r1071383, qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/threshold.py
Removed:
    qpid/branches/qpid-2935/qpid/java/common/src/main/resources/
    qpid/branches/qpid-2935/qpid/java/systests/src/old_test/
    qpid/branches/qpid-2935/qpid/java/tools/etc/jndi.properties
Modified:
    qpid/branches/qpid-2935/qpid/   (props changed)
    qpid/branches/qpid-2935/qpid/bin/release.sh
    qpid/branches/qpid-2935/qpid/cpp/INSTALL
    qpid/branches/qpid-2935/qpid/cpp/bindings/qmf2/examples/cpp/Makefile.am
    qpid/branches/qpid-2935/qpid/cpp/bindings/qmf2/examples/python/agent.py
    qpid/branches/qpid-2935/qpid/cpp/bindings/qmf2/examples/ruby/find_agents.rb
    qpid/branches/qpid-2935/qpid/cpp/bindings/qmf2/python/qmf2.py
    qpid/branches/qpid-2935/qpid/cpp/bindings/qmf2/ruby/qmf2.rb
    qpid/branches/qpid-2935/qpid/cpp/bindings/qpid/Makefile.am
    qpid/branches/qpid-2935/qpid/cpp/configure.ac
    qpid/branches/qpid-2935/qpid/cpp/examples/tradedemo/tradedemo_topic_publisher.vcproj   (props changed)
    qpid/branches/qpid-2935/qpid/cpp/include/qmf/engine/Agent.h   (props changed)
    qpid/branches/qpid-2935/qpid/cpp/include/qmf/engine/Console.h   (props changed)
    qpid/branches/qpid-2935/qpid/cpp/src/CMakeLists.txt   (contents, props changed)
    qpid/branches/qpid-2935/qpid/cpp/src/Makefile.am
    qpid/branches/qpid-2935/qpid/cpp/src/qmf/Agent.cpp
    qpid/branches/qpid-2935/qpid/cpp/src/qmf/AgentSession.cpp
    qpid/branches/qpid-2935/qpid/cpp/src/qmf/ConsoleSession.cpp
    qpid/branches/qpid-2935/qpid/cpp/src/qmf/ConsoleSessionImpl.h
    qpid/branches/qpid-2935/qpid/cpp/src/qmf/engine/Agent.cpp   (props changed)
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/RefCountedBuffer.h
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.cpp
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.h
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Message.cpp
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Message.h
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.h
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueEvents.cpp
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueEvents.h
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueRegistry.h
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionHandler.cpp
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.cpp
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.h
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/cluster/Connection.cpp
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/cluster/Connection.h
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/cluster/UpdateClient.cpp   (contents, props changed)
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/cluster/UpdateClient.h   (props changed)
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/management/ManagementAgent.cpp   (props changed)
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/management/ManagementAgent.h   (props changed)
    qpid/branches/qpid-2935/qpid/cpp/src/tests/CMakeLists.txt
    qpid/branches/qpid-2935/qpid/cpp/src/tests/MessagingSessionTests.cpp
    qpid/branches/qpid-2935/qpid/cpp/src/tests/cluster.mk
    qpid/branches/qpid-2935/qpid/cpp/src/tests/cluster_test.cpp
    qpid/branches/qpid-2935/qpid/cpp/src/tests/cluster_tests.py   (props changed)
    qpid/branches/qpid-2935/qpid/cpp/src/tests/qpid-cpp-benchmark
    qpid/branches/qpid-2935/qpid/cpp/src/tests/qpid-receive.cpp
    qpid/branches/qpid-2935/qpid/cpp/src/tests/qpid-send.cpp
    qpid/branches/qpid-2935/qpid/cpp/xml/cluster.xml
    qpid/branches/qpid-2935/qpid/dotnet/build-msbuild.bat   (props changed)
    qpid/branches/qpid-2935/qpid/dotnet/build-nant-release   (props changed)
    qpid/branches/qpid-2935/qpid/dotnet/build-nant.bat   (props changed)
    qpid/branches/qpid-2935/qpid/extras/qmf/src/py/qmf/console.py
    qpid/branches/qpid-2935/qpid/extras/sasl/m4/ac_pkg_swig.m4
    qpid/branches/qpid-2935/qpid/java/   (props changed)
    qpid/branches/qpid-2935/qpid/java/broker/   (props changed)
    qpid/branches/qpid-2935/qpid/java/broker/bin/   (props changed)
    qpid/branches/qpid-2935/qpid/java/broker/src/main/java/org/apache/qpid/server/management/   (props changed)
    qpid/branches/qpid-2935/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/   (props changed)
    qpid/branches/qpid-2935/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/   (props changed)
    qpid/branches/qpid-2935/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    qpid/branches/qpid-2935/qpid/java/common.xml
    qpid/branches/qpid-2935/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
    qpid/branches/qpid-2935/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
    qpid/branches/qpid-2935/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
    qpid/branches/qpid-2935/qpid/java/management/client/src/main/java/org/apache/qpid/management/   (props changed)
    qpid/branches/qpid-2935/qpid/java/management/client/src/test/java/org/apache/qpid/management/   (props changed)
    qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ConfigurationManagement.java   (props changed)
    qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/LoggingManagement.java   (props changed)
    qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java   (props changed)
    qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java   (props changed)
    qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java   (props changed)
    qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java   (props changed)
    qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/UserManagement.java   (props changed)
    qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanAttribute.java   (props changed)
    qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanConstructor.java   (props changed)
    qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanDescription.java   (props changed)
    qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperation.java   (props changed)
    qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperationParameter.java   (props changed)
    qpid/branches/qpid-2935/qpid/java/management/eclipse-plugin/src/   (props changed)
    qpid/branches/qpid-2935/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ConnectionTypeTabControl.java   (props changed)
    qpid/branches/qpid-2935/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ExchangeTypeTabControl.java   (props changed)
    qpid/branches/qpid-2935/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/MBeanTypeTabControl.java   (props changed)
    qpid/branches/qpid-2935/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/QueueTypeTabControl.java   (props changed)
    qpid/branches/qpid-2935/qpid/java/management/eclipse-plugin/src/main/resources/macosx/Contents/MacOS/qpidmc   (props changed)
    qpid/branches/qpid-2935/qpid/java/module.xml
    qpid/branches/qpid-2935/qpid/java/systests/src/main/java/org/apache/qpid/client/MultipleJCAProviderRegistrationTest.java   (props changed)
    qpid/branches/qpid-2935/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java   (props changed)
    qpid/branches/qpid-2935/qpid/java/test-profiles/   (props changed)
    qpid/branches/qpid-2935/qpid/java/test-profiles/08StandaloneExcludes   (props changed)
    qpid/branches/qpid-2935/qpid/java/test-profiles/CPPExcludes   (props changed)
    qpid/branches/qpid-2935/qpid/java/test-profiles/Excludes   (props changed)
    qpid/branches/qpid-2935/qpid/java/test-profiles/JavaExcludes   (props changed)
    qpid/branches/qpid-2935/qpid/java/test-profiles/JavaStandaloneExcludes   (props changed)
    qpid/branches/qpid-2935/qpid/java/test-profiles/JavaTransientExcludes   (props changed)
    qpid/branches/qpid-2935/qpid/java/test-profiles/XAExcludes   (props changed)
    qpid/branches/qpid-2935/qpid/java/test-profiles/clean-dir   (props changed)
    qpid/branches/qpid-2935/qpid/java/test-profiles/cpp.async.testprofile   (props changed)
    qpid/branches/qpid-2935/qpid/java/test-profiles/cpp.cluster.testprofile   (props changed)
    qpid/branches/qpid-2935/qpid/java/test-profiles/cpp.noprefetch.testprofile   (props changed)
    qpid/branches/qpid-2935/qpid/java/test-profiles/cpp.ssl.excludes   (props changed)
    qpid/branches/qpid-2935/qpid/java/test-profiles/cpp.ssl.testprofile   (props changed)
    qpid/branches/qpid-2935/qpid/java/test-profiles/cpp.testprofile   (props changed)
    qpid/branches/qpid-2935/qpid/java/test-profiles/default.testprofile   (props changed)
    qpid/branches/qpid-2935/qpid/java/test-profiles/java-derby.testprofile   (props changed)
    qpid/branches/qpid-2935/qpid/java/test-profiles/java.testprofile   (props changed)
    qpid/branches/qpid-2935/qpid/java/test-profiles/log4j-test.xml   (props changed)
    qpid/branches/qpid-2935/qpid/java/test-profiles/test-provider.properties   (props changed)
    qpid/branches/qpid-2935/qpid/java/test-profiles/test_resources/   (props changed)
    qpid/branches/qpid-2935/qpid/java/tools/bin/perf_report.sh
    qpid/branches/qpid-2935/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java
    qpid/branches/qpid-2935/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java
    qpid/branches/qpid-2935/qpid/packaging/windows/   (props changed)
    qpid/branches/qpid-2935/qpid/python/   (props changed)
    qpid/branches/qpid-2935/qpid/python/examples/api/spout   (props changed)
    qpid/branches/qpid-2935/qpid/python/qpid/concurrency.py   (props changed)
    qpid/branches/qpid-2935/qpid/ruby/ext/sasl/extconf.rb   (props changed)
    qpid/branches/qpid-2935/qpid/specs/management-schema.xml
    qpid/branches/qpid-2935/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py
    qpid/branches/qpid-2935/qpid/tests/src/py/qpid_tests/broker_0_9/queue.py   (props changed)
    qpid/branches/qpid-2935/qpid/tools/src/py/.gitignore

Propchange: qpid/branches/qpid-2935/qpid/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Feb 17 14:08:14 2011
@@ -2,4 +2,4 @@
 /qpid/branches/0.6-release-windows-installer:926803
 /qpid/branches/0.6-release-windows-installer/qpid:926803,927233
 /qpid/branches/java-network-refactor/qpid:805429-825319
-/qpid/trunk/qpid:1061302-1068442
+/qpid/trunk/qpid:1061302-1071383

Modified: qpid/branches/qpid-2935/qpid/bin/release.sh
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/bin/release.sh?rev=1071615&r1=1071614&r2=1071615&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/bin/release.sh (original)
+++ qpid/branches/qpid-2935/qpid/bin/release.sh Thu Feb 17 14:08:14 2011
@@ -208,7 +208,7 @@ fi
 
 if [ "JAVA" == "$JAVA" ] ; then
   pushd qpid-${VER}/java
-  ant build release release-bin release-mvn -Dsvnversion.output=${REV}
+  ant build release release-bin release-mvn -Dsvnversion.output=${REV} -Dmaven.snapshot=false
   popd
 
   cp qpid-${VER}/java/release/*.tar.gz  artifacts/qpid-java-${VER}.tar.gz

Modified: qpid/branches/qpid-2935/qpid/cpp/INSTALL
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/INSTALL?rev=1071615&r1=1071614&r2=1071615&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/INSTALL (original)
+++ qpid/branches/qpid-2935/qpid/cpp/INSTALL Thu Feb 17 14:08:14 2011
@@ -67,6 +67,10 @@ Optional SSL support requires:
 * nss <http://www.mozilla.org/projects/security/pki/nss/>
 * nspr <http://www.mozilla.org/projects/nspr/>
 
+Optional binding support for ruby requires:
+* ruby and ruby devel <http://www.ruby-lang.org/en/>
+* swig <http://www.swig.org/>
+
 Qpid has been built using the GNU C++ compiler:
  * gcc     <http://gcc.gnu.org/>            		(3.4.6)
 
@@ -124,6 +128,9 @@ For the XML Exchange, include:
 
  # yum install xqilla-devel xerces-c-devel
 
+Optional ruby binding support include:
+ # yum install ruby ruby-devel swig
+
 Follow the manual installation instruction below for any packages not
 available through your distributions packaging tool.
 

Modified: qpid/branches/qpid-2935/qpid/cpp/bindings/qmf2/examples/cpp/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/bindings/qmf2/examples/cpp/Makefile.am?rev=1071615&r1=1071614&r2=1071615&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/bindings/qmf2/examples/cpp/Makefile.am (original)
+++ qpid/branches/qpid-2935/qpid/cpp/bindings/qmf2/examples/cpp/Makefile.am Thu Feb 17 14:08:14 2011
@@ -21,7 +21,7 @@ INCLUDE = -I$(top_srcdir)/include
 
 AM_CPPFLAGS = $(INCLUDE)
 
-noinst_PROGRAMS=agent list_agents
+noinst_PROGRAMS=agent list_agents print_events
 
 agent_SOURCES=agent.cpp
 agent_LDADD=$(top_builddir)/src/libqmf2.la
@@ -29,3 +29,5 @@ agent_LDADD=$(top_builddir)/src/libqmf2.
 list_agents_SOURCES=list_agents.cpp
 list_agents_LDADD=$(top_builddir)/src/libqmf2.la
 
+print_events_SOURCES=print_events.cpp
+print_events_LDADD=$(top_builddir)/src/libqmf2.la

Modified: qpid/branches/qpid-2935/qpid/cpp/bindings/qmf2/examples/python/agent.py
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/bindings/qmf2/examples/python/agent.py?rev=1071615&r1=1071614&r2=1071615&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/bindings/qmf2/examples/python/agent.py (original)
+++ qpid/branches/qpid-2935/qpid/cpp/bindings/qmf2/examples/python/agent.py Thu Feb 17 14:08:14 2011
@@ -69,32 +69,41 @@ class ExampleAgent(AgentHandler):
     if addr == self.controlAddr:
       self.control.methodCount += 1
 
-      if methodName == "stop":
-        self.session.methodSuccess(handle)
-        self.cancel()
-
-      elif methodName == "echo":
-        handle.addReturnArgument("sequence", args["sequence"])
-        handle.addReturnArgument("map", args["map"])
-        self.session.methodSuccess(handle)
-
-      elif methodName == "fail":
-        if args['useString']:
-          self.session.raiseException(handle, args['stringVal'])
-        else:
-          ex = Data(self.sch_exception)
-          ex.whatHappened = "It Failed"
-          ex.howBad = 75
-          ex.details = args['details']
-          self.session.raiseException(handle, ex)
-
-      elif methodName == "create_child":
-        name = args['name']
-        child = Data(self.sch_child)
-        child.name = name
-        addr = self.session.addData(child, name)
-        handle.addReturnArgument("childAddr", addr.asMap())
-        self.session.methodSuccess(handle)
+      try:
+        if methodName == "stop":
+          self.session.methodSuccess(handle)
+          self.cancel()
+
+        elif methodName == "echo":
+          handle.addReturnArgument("sequence", args["sequence"])
+          handle.addReturnArgument("map", args["map"])
+          self.session.methodSuccess(handle)
+
+        elif methodName == "event":
+          ev = Data(self.sch_event)
+          ev.text = args['text']
+          self.session.raiseEvent(ev, args['severity'])
+          self.session.methodSuccess(handle)
+
+        elif methodName == "fail":
+          if args['useString']:
+            self.session.raiseException(handle, args['stringVal'])
+          else:
+            ex = Data(self.sch_exception)
+            ex.whatHappened = "It Failed"
+            ex.howBad = 75
+            ex.details = args['details']
+            self.session.raiseException(handle, ex)
+
+        elif methodName == "create_child":
+          name = args['name']
+          child = Data(self.sch_child)
+          child.name = name
+          addr = self.session.addData(child, name)
+          handle.addReturnArgument("childAddr", addr.asMap())
+          self.session.methodSuccess(handle)
+      except BaseException, e:
+        self.session.raiseException(handle, "%r" % e)
 
 
   def setupSchema(self):
@@ -128,6 +137,11 @@ class ExampleAgent(AgentHandler):
     echoMethod.addArgument(SchemaProperty("map", SCHEMA_DATA_MAP, direction=DIR_IN_OUT))
     self.sch_control.addMethod(echoMethod)
 
+    eventMethod = SchemaMethod("event", desc="Raise an Event")
+    eventMethod.addArgument(SchemaProperty("text", SCHEMA_DATA_STRING, direction=DIR_IN))
+    eventMethod.addArgument(SchemaProperty("severity", SCHEMA_DATA_INT, direction=DIR_IN))
+    self.sch_control.addMethod(eventMethod)
+
     failMethod = SchemaMethod("fail", desc="Expected to Fail")
     failMethod.addArgument(SchemaProperty("useString", SCHEMA_DATA_BOOL, direction=DIR_IN))
     failMethod.addArgument(SchemaProperty("stringVal", SCHEMA_DATA_STRING, direction=DIR_IN))
@@ -146,11 +160,18 @@ class ExampleAgent(AgentHandler):
     self.sch_child.addProperty(SchemaProperty("name", SCHEMA_DATA_STRING))
 
     ##
+    ## Declare the event class
+    ##
+    self.sch_event = Schema(SCHEMA_TYPE_EVENT, package, "event")
+    self.sch_event.addProperty(SchemaProperty("text", SCHEMA_DATA_STRING))
+
+    ##
     ## Register our schemata with the agent session.
     ##
     self.session.registerSchema(self.sch_exception)
     self.session.registerSchema(self.sch_control)
     self.session.registerSchema(self.sch_child)
+    self.session.registerSchema(self.sch_event)
 
 
   def populateData(self):

Modified: qpid/branches/qpid-2935/qpid/cpp/bindings/qmf2/examples/ruby/find_agents.rb
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/bindings/qmf2/examples/ruby/find_agents.rb?rev=1071615&r1=1071614&r2=1071615&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/bindings/qmf2/examples/ruby/find_agents.rb (original)
+++ qpid/branches/qpid-2935/qpid/cpp/bindings/qmf2/examples/ruby/find_agents.rb Thu Feb 17 14:08:14 2011
@@ -27,7 +27,7 @@ class FindAgents < Qmf2::ConsoleHandler
   end
 
   def agent_added(agent)
-    puts "Agent Added: #{agent.to_s}"
+    puts "Agent Added: #{agent.name}"
   end
 
   def agent_deleted(agent, reason)

Modified: qpid/branches/qpid-2935/qpid/cpp/bindings/qmf2/python/qmf2.py
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/bindings/qmf2/python/qmf2.py?rev=1071615&r1=1071614&r2=1071615&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/bindings/qmf2/python/qmf2.py (original)
+++ qpid/branches/qpid-2935/qpid/cpp/bindings/qmf2/python/qmf2.py Thu Feb 17 14:08:14 2011
@@ -165,7 +165,7 @@ class ConsoleHandler(Thread):
           reason = 'filter'
           if event.getAgentDelReason() == cqmf2.AGENT_DEL_AGED:
             reason = 'aged'
-          self.agentDeleted(Agent(event.getAgent(), reason))
+          self.agentDeleted(Agent(event.getAgent()), reason)
 
         elif event.getType() == cqmf2.CONSOLE_AGENT_RESTART:
           self.agentRestarted(Agent(event.getAgent()))
@@ -373,6 +373,16 @@ class AgentSession(object):
     else:
       self._impl.raiseException(handle, data)
 
+  def raiseEvent(self, data, severity=None):
+    """
+    """
+    if not severity:
+      self._impl.raiseEvent(data._impl)
+    else:
+      if (severity.__class__ != int and severity.__class__ != long) or severity < 0 or severity > 7:
+        raise Exception("Severity must be an int between 0..7")
+      self._impl.raiseEvent(data._impl, severity);
+
 
 #===================================================================================================
 # AGENT PROXY

Modified: qpid/branches/qpid-2935/qpid/cpp/bindings/qmf2/ruby/qmf2.rb
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/bindings/qmf2/ruby/qmf2.rb?rev=1071615&r1=1071614&r2=1071615&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/bindings/qmf2/ruby/qmf2.rb (original)
+++ qpid/branches/qpid-2935/qpid/cpp/bindings/qmf2/ruby/qmf2.rb Thu Feb 17 14:08:14 2011
@@ -433,6 +433,14 @@ module Qmf2
     def del_data(addr)
       @impl.del_data(addr.impl)
     end
+
+    def raise_event(data, severity=nil)
+      if !severity
+        @impl.raiseEvent(data.impl)
+      else
+        @impl.raiseEvent(data.impl, severity)
+      end
+    end
   end
 
   ##==============================================================================

Modified: qpid/branches/qpid-2935/qpid/cpp/bindings/qpid/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/bindings/qpid/Makefile.am?rev=1071615&r1=1071614&r2=1071615&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/bindings/qpid/Makefile.am (original)
+++ qpid/branches/qpid-2935/qpid/cpp/bindings/qpid/Makefile.am Thu Feb 17 14:08:14 2011
@@ -17,10 +17,11 @@
 # under the License.
 #
 
+SUBDIRS = dotnet
+
 if HAVE_SWIG
 
 EXTRA_DIST = qpid.i
-SUBDIRS =
 
 if HAVE_RUBY_DEVEL
 SUBDIRS += ruby

Modified: qpid/branches/qpid-2935/qpid/cpp/configure.ac
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/configure.ac?rev=1071615&r1=1071614&r2=1071615&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/configure.ac (original)
+++ qpid/branches/qpid-2935/qpid/cpp/configure.ac Thu Feb 17 14:08:14 2011
@@ -16,7 +16,7 @@ AC_INIT([qpidc],
 	[dev@qpid.apache.org])
 
 AC_CONFIG_AUX_DIR([build-aux])
-AM_INIT_AUTOMAKE([dist-bzip2 subdir-objects])
+AM_INIT_AUTOMAKE([dist-bzip2 subdir-objects tar-ustar])
 
 # Minimum Autoconf version required.
 AC_PREREQ(2.59)
@@ -538,6 +538,7 @@ AC_CONFIG_FILES([
   bindings/qpid/ruby/Makefile
   bindings/qpid/python/Makefile
   bindings/qpid/perl/Makefile
+  bindings/qpid/dotnet/Makefile
   bindings/qmf/Makefile
   bindings/qmf/ruby/Makefile
   bindings/qmf/python/Makefile

Propchange: qpid/branches/qpid-2935/qpid/cpp/examples/tradedemo/tradedemo_topic_publisher.vcproj
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Feb 17 14:08:14 2011
@@ -2,4 +2,4 @@
 /qpid/branches/0.6-release-windows-installer/cpp/examples/tradedemo/tradedemo_topic_publisher.vcproj:926803
 /qpid/branches/0.6-release-windows-installer/qpid/cpp/examples/tradedemo/tradedemo_topic_publisher.vcproj:926803,927218,927233
 /qpid/branches/java-network-refactor/qpid/cpp/examples/tradedemo/tradedemo_topic_publisher.vcproj:805429-825319
-/qpid/trunk/qpid/cpp/examples/tradedemo/tradedemo_topic_publisher.vcproj:1061302-1068442
+/qpid/trunk/qpid/cpp/examples/tradedemo/tradedemo_topic_publisher.vcproj:1061302-1071383

Propchange: qpid/branches/qpid-2935/qpid/cpp/include/qmf/engine/Agent.h
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Feb 17 14:08:14 2011
@@ -1 +1 @@
-/qpid/trunk/qpid/cpp/include/qmf/engine/Agent.h:1061302-1068442
+/qpid/trunk/qpid/cpp/include/qmf/engine/Agent.h:1061302-1071383

Propchange: qpid/branches/qpid-2935/qpid/cpp/include/qmf/engine/Console.h
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Feb 17 14:08:14 2011
@@ -1 +1 @@
-/qpid/trunk/qpid/cpp/include/qmf/engine/Console.h:1061302-1068442
+/qpid/trunk/qpid/cpp/include/qmf/engine/Console.h:1061302-1071383

Modified: qpid/branches/qpid-2935/qpid/cpp/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/CMakeLists.txt?rev=1071615&r1=1071614&r2=1071615&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/CMakeLists.txt (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/CMakeLists.txt Thu Feb 17 14:08:14 2011
@@ -313,6 +313,10 @@ if (NOT Boost_FILESYSTEM_LIBRARY)
   set(Boost_FILESYSTEM_LIBRARY boost_filesystem)
 endif (NOT Boost_FILESYSTEM_LIBRARY)
 
+if (NOT Boost_SYSTEM_LIBRARY)
+  set(Boost_SYSTEM_LIBRARY boost_system)
+endif (NOT Boost_SYSTEM_LIBRARY)
+
 if (NOT Boost_UNIT_TEST_FRAMEWORK_LIBRARY)
   set(Boost_UNIT_TEST_FRAMEWORK_LIBRARY boost_unit_test_framework)
 endif (NOT Boost_UNIT_TEST_FRAMEWORK_LIBRARY)
@@ -487,6 +491,7 @@ endif (BUILD_SASL)
 CHECK_LIBRARY_EXISTS (xerces-c _init "" HAVE_XERCES)
 CHECK_INCLUDE_FILE_CXX (xercesc/framework/MemBufInputSource.hpp HAVE_XERCES_H)
 CHECK_INCLUDE_FILE_CXX (xqilla/xqilla-simple.hpp HAVE_XQILLA_H)
+CHECK_INCLUDE_FILE_CXX (xqilla/ast/XQEffectiveBooleanValue.hpp HAVE_XQ_EBV)
 
 set (xml_default ${xml_force})
 if (CMAKE_SYSTEM_NAME STREQUAL Windows)
@@ -510,6 +515,10 @@ if (BUILD_XML)
     message(FATAL_ERROR "XML Exchange support requested but XQilla headers not found")
   endif (NOT HAVE_XQILLA_H)
 
+  if (HAVE_XQ_EBV)
+    add_definitions(-DXQ_EFFECTIVE_BOOLEAN_VALUE_HPP)
+  endif (HAVE_XQ_EBV)
+
   add_library (xml MODULE
                qpid/xml/XmlExchange.cpp
                qpid/xml/XmlExchange.h
@@ -956,6 +965,11 @@ set (qpidbroker_SOURCES
      qpid/broker/Broker.cpp
      qpid/broker/Exchange.cpp
      qpid/broker/ExpiryPolicy.cpp
+     qpid/broker/Fairshare.cpp
+     qpid/broker/LegacyLVQ.cpp
+     qpid/broker/MessageDeque.cpp
+     qpid/broker/MessageMap.cpp
+     qpid/broker/PriorityQueue.cpp
      qpid/broker/Queue.cpp
      qpid/broker/QueueCleaner.cpp
      qpid/broker/QueueListeners.cpp
@@ -1006,6 +1020,7 @@ set (qpidbroker_SOURCES
      qpid/broker/SessionHandler.h
      qpid/broker/SessionHandler.cpp
      qpid/broker/System.cpp
+     qpid/broker/ThresholdAlerts.cpp
      qpid/broker/TopicExchange.cpp
      qpid/broker/TxAccept.cpp
      qpid/broker/TxBuffer.cpp
@@ -1207,6 +1222,8 @@ install (TARGETS replication_exchange
 # file whereas older builds only have config.h on autoconf-generated builds.
 add_definitions(-DHAVE_CONFIG_H)
 
+add_definitions(-DBOOST_FILESYSTEM_VERSION=2)
+
 # Now create the config file from all the info learned above.
 configure_file(${CMAKE_CURRENT_SOURCE_DIR}/config.h.cmake
                ${CMAKE_CURRENT_BINARY_DIR}/config.h)

Propchange: qpid/branches/qpid-2935/qpid/cpp/src/CMakeLists.txt
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Feb 17 14:08:14 2011
@@ -2,4 +2,4 @@
 /qpid/branches/0.6-release-windows-installer/cpp/src/CMakeLists.txt:926803
 /qpid/branches/0.6-release-windows-installer/qpid/cpp/src/CMakeLists.txt:926803,927233,932132
 /qpid/branches/java-network-refactor/qpid/cpp/src/CMakeLists.txt:805429-825319
-/qpid/trunk/qpid/cpp/src/CMakeLists.txt:1061302-1068442
+/qpid/trunk/qpid/cpp/src/CMakeLists.txt:1061302-1071383

Modified: qpid/branches/qpid-2935/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/Makefile.am?rev=1071615&r1=1071614&r2=1071615&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/Makefile.am (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/Makefile.am Thu Feb 17 14:08:14 2011
@@ -24,6 +24,7 @@ SUBDIRS = . tests
 # using Visual Studio solutions/projects.
 windows_dist = \
   qpid/client/windows/SaslFactory.cpp \
+  qpid/client/windows/SslConnector.cpp \
   qpid/log/windows/SinkOptions.cpp \
   qpid/log/windows/SinkOptions.h \
   ../include/qpid/sys/windows/check.h \
@@ -42,6 +43,8 @@ windows_dist = \
   qpid/sys/windows/Shlib.cpp \
   qpid/sys/windows/SocketAddress.cpp \
   qpid/sys/windows/Socket.cpp \
+  qpid/sys/windows/SslAsynchIO.cpp \
+  qpid/sys/windows/SslAsynchIO.h \
   qpid/sys/windows/StrError.cpp \
   qpid/sys/windows/SystemInfo.cpp \
   qpid/sys/windows/Thread.cpp \
@@ -51,7 +54,9 @@ windows_dist = \
   qpid/sys/windows/uuid.h \
   windows/QpiddBroker.cpp \
   qpid/broker/windows/BrokerDefaults.cpp \
-  qpid/broker/windows/SaslAuthenticator.cpp
+  qpid/broker/windows/SaslAuthenticator.cpp \
+  qpid/broker/windows/SslProtocolFactory.cpp \
+  qpid/messaging/HandleInstantiator.cpp
 
 EXTRA_DIST= $(platform_dist) $(rgen_srcs) $(windows_dist)
 
@@ -122,6 +127,8 @@ qpidtest_SCRIPTS =
 tmoduledir = $(libdir)/qpid/tests
 tmodule_LTLIBRARIES=
 
+AM_CXXFLAGS += -DBOOST_FILESYSTEM_VERSION=2
+
 ## Automake macros to build libraries and executables.
 qpidd_CXXFLAGS = $(AM_CXXFLAGS) -DQPIDD_MODULE_DIR=\"$(dmoduledir)\" -DQPIDD_CONF_FILE=\"$(sysconfdir)/qpidd.conf\"
 libqpidclient_la_CXXFLAGS = $(AM_CXXFLAGS) -DQPIDC_MODULE_DIR=\"$(cmoduledir)\" -DQPIDC_CONF_FILE=\"$(confdir)/qpidc.conf\"
@@ -543,6 +550,8 @@ libqpidbroker_la_SOURCES = \
   qpid/broker/ExchangeRegistry.h \
   qpid/broker/ExpiryPolicy.cpp \
   qpid/broker/ExpiryPolicy.h \
+  qpid/broker/Fairshare.h \
+  qpid/broker/Fairshare.cpp \
   qpid/broker/FanOutExchange.cpp \
   qpid/broker/FanOutExchange.h \
   qpid/broker/FedOps.h \
@@ -550,6 +559,8 @@ libqpidbroker_la_SOURCES = \
   qpid/broker/HeadersExchange.cpp \
   qpid/broker/HeadersExchange.h \
   qpid/broker/AsyncCompletion.h \
+  qpid/broker/LegacyLVQ.h \
+  qpid/broker/LegacyLVQ.cpp \
   qpid/broker/Link.cpp \
   qpid/broker/Link.h \
   qpid/broker/LinkRegistry.cpp \
@@ -560,9 +571,16 @@ libqpidbroker_la_SOURCES = \
   qpid/broker/MessageAdapter.h \
   qpid/broker/MessageBuilder.cpp \
   qpid/broker/MessageBuilder.h \
+  qpid/broker/MessageDeque.h \
+  qpid/broker/MessageDeque.cpp \
+  qpid/broker/MessageMap.h \
+  qpid/broker/MessageMap.cpp \
+  qpid/broker/Messages.h \
   qpid/broker/MessageStore.h \
   qpid/broker/MessageStoreModule.cpp \
   qpid/broker/MessageStoreModule.h \
+  qpid/broker/PriorityQueue.h \
+  qpid/broker/PriorityQueue.cpp \
   qpid/broker/NameGenerator.cpp \
   qpid/broker/NameGenerator.h \
   qpid/broker/NullMessageStore.cpp \
@@ -584,6 +602,7 @@ libqpidbroker_la_SOURCES = \
   qpid/broker/QueueEvents.h \
   qpid/broker/QueueListeners.cpp \
   qpid/broker/QueueListeners.h \
+  qpid/broker/QueueObserver.h \
   qpid/broker/QueuePolicy.cpp \
   qpid/broker/QueuePolicy.h \
   qpid/broker/QueueRegistry.cpp \
@@ -632,6 +651,8 @@ libqpidbroker_la_SOURCES = \
   qpid/broker/SignalHandler.h \
   qpid/broker/System.cpp \
   qpid/broker/System.h \
+  qpid/broker/ThresholdAlerts.cpp \
+  qpid/broker/ThresholdAlerts.h \
   qpid/broker/TopicExchange.cpp \
   qpid/broker/TopicExchange.h \
   qpid/broker/TransactionalStore.h \

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qmf/Agent.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qmf/Agent.cpp?rev=1071615&r1=1071614&r2=1071615&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qmf/Agent.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qmf/Agent.cpp Thu Feb 17 14:08:14 2011
@@ -339,7 +339,7 @@ void AgentImpl::handleMethodResponse(con
     uint32_t correlator;
     boost::shared_ptr<SyncContext> context;
 
-    QPID_LOG(trace, "RCVD MethodResponse map=" << response);
+    QPID_LOG(trace, "RCVD MethodResponse cid=" << cid << " map=" << response);
 
     aIter = response.find("_arguments");
     if (aIter != response.end())
@@ -556,13 +556,14 @@ void AgentImpl::sendQuery(const Query& q
     msg.setReplyTo(session.replyAddress);
     msg.setCorrelationId(boost::lexical_cast<string>(correlator));
     msg.setSubject(directSubject);
-    if (!session.authUser.empty())
-        msg.setUserId(session.authUser);
+    string userId(session.connection.getAuthenticatedUsername());
+    if (!userId.empty())
+        msg.setUserId(userId);
     encode(QueryImplAccess::get(query).asMap(), msg);
-    if (sender.isValid())
+    if (sender.isValid()) {
         sender.send(msg);
-
-    QPID_LOG(trace, "SENT QueryRequest to=" << name);
+        QPID_LOG(trace, "SENT QueryRequest to=" << sender.getName() << "/" << directSubject << " cid=" << correlator);
+    }
 }
 
 
@@ -583,13 +584,14 @@ void AgentImpl::sendMethod(const string&
     msg.setReplyTo(session.replyAddress);
     msg.setCorrelationId(boost::lexical_cast<string>(correlator));
     msg.setSubject(directSubject);
-    if (!session.authUser.empty())
-        msg.setUserId(session.authUser);
+    string userId(session.connection.getAuthenticatedUsername());
+    if (!userId.empty())
+        msg.setUserId(userId);
     encode(map, msg);
-    if (sender.isValid())
+    if (sender.isValid()) {
         sender.send(msg);
-
-    QPID_LOG(trace, "SENT MethodRequest method=" << method << " to=" << name);
+        QPID_LOG(trace, "SENT MethodRequest method=" << method << " to=" << sender.getName() << "/" << directSubject << " content=" << map << " cid=" << correlator);
+    }
 }
 
 void AgentImpl::sendSchemaRequest(const SchemaId& id)
@@ -626,12 +628,13 @@ void AgentImpl::sendSchemaRequest(const 
     msg.setReplyTo(session.replyAddress);
     msg.setContent(content);
     msg.setSubject(directSubject);
-    if (!session.authUser.empty())
-        msg.setUserId(session.authUser);
-    if (sender.isValid())
+    string userId(session.connection.getAuthenticatedUsername());
+    if (!userId.empty())
+        msg.setUserId(userId);
+    if (sender.isValid()) {
         sender.send(msg);
-
-    QPID_LOG(trace, "SENT V1SchemaRequest to=" << name);
+        QPID_LOG(trace, "SENT V1SchemaRequest to=" << sender.getName() << "/" << directSubject);
+    }
 }
 
 

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qmf/AgentSession.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qmf/AgentSession.cpp?rev=1071615&r1=1071614&r2=1071615&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qmf/AgentSession.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qmf/AgentSession.cpp Thu Feb 17 14:08:14 2011
@@ -571,7 +571,7 @@ void AgentSessionImpl::raiseEvent(const 
     encode(list, msg);
     topicSender.send(msg);
 
-    QPID_LOG(trace, "SENT EventIndication to=" << subject);
+    QPID_LOG(trace, "SENT EventIndication to=" << topicSender.getName() << "/" << subject);
 }
 
 
@@ -625,7 +625,7 @@ void AgentSessionImpl::setAgentName()
 
 void AgentSessionImpl::handleLocateRequest(const Variant::List& predicate, const Message& msg)
 {
-    QPID_LOG(trace, "RCVD AgentLocateRequest");
+    QPID_LOG(trace, "RCVD AgentLocateRequest from=" << msg.getReplyTo());
 
     if (!predicate.empty()) {
         Query agentQuery(QUERY_OBJECT);
@@ -659,7 +659,7 @@ void AgentSessionImpl::handleLocateReque
 
 void AgentSessionImpl::handleMethodRequest(const Variant::Map& content, const Message& msg)
 {
-    QPID_LOG(trace, "RCVD MethodRequest map=" << content << " from=" << msg.getReplyTo());
+    QPID_LOG(trace, "RCVD MethodRequest map=" << content << " from=" << msg.getReplyTo() << " cid=" << msg.getCorrelationId());
 
     //
     // Construct an AgentEvent to be sent to the application.
@@ -719,7 +719,7 @@ void AgentSessionImpl::handleMethodReque
 
 void AgentSessionImpl::handleQueryRequest(const Variant::Map& content, const Message& msg)
 {
-    QPID_LOG(trace, "RCVD QueryRequest query=" << content << " from=" << msg.getReplyTo());
+    QPID_LOG(trace, "RCVD QueryRequest query=" << content << " from=" << msg.getReplyTo() << " cid=" << msg.getCorrelationId());
 
     //
     // Construct an AgentEvent to be sent to the application or directly handled by the agent.

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qmf/ConsoleSession.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qmf/ConsoleSession.cpp?rev=1071615&r1=1071614&r2=1071615&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qmf/ConsoleSession.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qmf/ConsoleSession.cpp Thu Feb 17 14:08:14 2011
@@ -65,7 +65,7 @@ Subscription ConsoleSession::subscribe(c
 //========================================================================================
 
 ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) :
-    connection(c), domain("default"), authUser(c.getAuthenticatedUsername()), maxAgentAgeMinutes(5),
+    connection(c), domain("default"), maxAgentAgeMinutes(5),
     opened(false), thread(0), threadCanceled(false), lastVisit(0), lastAgePass(0),
     connectedBrokerInAgentList(false), schemaCache(new SchemaCache())
 {
@@ -394,6 +394,7 @@ void ConsoleSessionImpl::sendAgentLocate
 {
     Message msg;
     Variant::Map& headers(msg.getProperties());
+    static const string subject("console.request.agent_locate");
 
     headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_REQUEST;
     headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_AGENT_LOCATE_REQUEST;
@@ -401,12 +402,12 @@ void ConsoleSessionImpl::sendAgentLocate
 
     msg.setReplyTo(replyAddress);
     msg.setCorrelationId("agent-locate");
-    msg.setSubject("console.request.agent_locate");
+    msg.setSubject(subject);
     encode(agentQuery.getPredicate(), msg);
 
     topicSender.send(msg);
 
-    QPID_LOG(trace, "SENT AgentLocate to topic");
+    QPID_LOG(trace, "SENT AgentLocate to=" << topicSender.getName() << "/" << subject);
 }
 
 

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qmf/ConsoleSessionImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qmf/ConsoleSessionImpl.h?rev=1071615&r1=1071614&r2=1071615&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qmf/ConsoleSessionImpl.h (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qmf/ConsoleSessionImpl.h Thu Feb 17 14:08:14 2011
@@ -72,7 +72,6 @@ namespace qmf {
         qpid::messaging::Sender directSender;
         qpid::messaging::Sender topicSender;
         std::string domain;
-        std::string authUser;
         uint32_t maxAgentAgeMinutes;
         bool listenOnDirect;
         bool strictSecurity;

Propchange: qpid/branches/qpid-2935/qpid/cpp/src/qmf/engine/Agent.cpp
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Feb 17 14:08:14 2011
@@ -1 +1 @@
-/qpid/trunk/qpid/cpp/src/qmf/engine/Agent.cpp:1061302-1068442
+/qpid/trunk/qpid/cpp/src/qmf/engine/Agent.cpp:1061302-1071383

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/RefCountedBuffer.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/RefCountedBuffer.h?rev=1071615&r1=1071614&r2=1071615&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/RefCountedBuffer.h (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/RefCountedBuffer.h Thu Feb 17 14:08:14 2011
@@ -27,7 +27,7 @@
 #include <boost/intrusive_ptr.hpp>
 
 namespace qpid {
-// FIXME aconway 2008-09-06: easy to add alignment
+
 /**
  * Reference-counted byte buffer.
  * No alignment guarantees.
@@ -51,7 +51,7 @@ public:
         pointer(const pointer&);
         ~pointer();
         pointer& operator=(const pointer&);
-        
+
         char* get() { return cp(); }
         operator char*() { return cp(); }
         char& operator*() { return *cp(); }
@@ -62,7 +62,7 @@ public:
         const char& operator*() const { return *cp(); }
         const char& operator[](size_t i) const { return cp()[i]; }
     };
-    
+
     /** Create a reference counted buffer of size n */
     static pointer create(size_t n);
 

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1071615&r1=1071614&r2=1071615&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.cpp Thu Feb 17 14:08:14 2011
@@ -103,7 +103,8 @@ Broker::Options::Options(const std::stri
     requireEncrypted(false),
     maxSessionRate(0),
     asyncQueueEvents(false),     // Must be false in a cluster.
-    qmf2Support(false),
+    qmf2Support(true),
+    qmf1Support(true),
     queueFlowStopRatio(80),
     queueFlowResumeRatio(70)
 {
@@ -125,7 +126,8 @@ Broker::Options::Options(const std::stri
         ("max-connections", optValue(maxConnections, "N"), "Sets the maximum allowed connections")
         ("connection-backlog", optValue(connectionBacklog, "N"), "Sets the connection backlog limit for the server socket")
         ("mgmt-enable,m", optValue(enableMgmt,"yes|no"), "Enable Management")
-        ("mgmt-qmf2", optValue(qmf2Support,"yes|no"), "Use QMF v2 for Broker Management")
+        ("mgmt-qmf2", optValue(qmf2Support,"yes|no"), "Enable broadcast of management information over QMF v2")
+        ("mgmt-qmf1", optValue(qmf1Support,"yes|no"), "Enable broadcast of management information over QMF v1")
         ("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"), "Management Publish Interval")
         ("queue-purge-interval", optValue(queueCleanInterval, "SECONDS"),
          "Interval between attempts to purge any expired messages from queues")
@@ -153,7 +155,7 @@ const std::string knownHostsNone("none")
 Broker::Broker(const Broker::Options& conf) :
     poller(new Poller),
     config(conf),
-    managementAgent(conf.enableMgmt ? new ManagementAgent(!conf.qmf2Support,
+    managementAgent(conf.enableMgmt ? new ManagementAgent(conf.qmf1Support,
                                                           conf.qmf2Support)
                                     : 0),
     store(new NullMessageStore),
@@ -225,7 +227,6 @@ Broker::Broker(const Broker::Options& co
     }
 
     QueuePolicy::setDefaultMaxSize(conf.queueLimit);
-    queues.setQueueEvents(&queueEvents);
 
     // Early-Initialize plugins
     Plugin::earlyInitAll(*this);

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.h?rev=1071615&r1=1071614&r2=1071615&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.h Thu Feb 17 14:08:14 2011
@@ -115,6 +115,7 @@ public:
         uint32_t maxSessionRate;
         bool asyncQueueEvents;
         bool qmf2Support;
+        bool qmf1Support;
         uint queueFlowStopRatio;    // producer flow control: on
         uint queueFlowResumeRatio;  // producer flow control: off
 

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Message.cpp?rev=1071615&r1=1071614&r2=1071615&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Message.cpp Thu Feb 17 14:08:14 2011
@@ -400,22 +400,6 @@ bool Message::hasExpired()
     return expiryPolicy && expiryPolicy->hasExpired(*this);
 }
 
-boost::intrusive_ptr<Message>& Message::getReplacementMessage(const Queue* qfor) const
-{
-    sys::Mutex::ScopedLock l(lock);
-    Replacement::iterator i = replacement.find(qfor);
-    if (i != replacement.end()){
-        return i->second;
-    }           
-    return empty;
-}
-
-void Message::setReplacementMessage(boost::intrusive_ptr<Message> msg, const Queue* qfor)
-{
-    sys::Mutex::ScopedLock l(lock);
-    replacement[qfor] = msg;
-}
-
 namespace {
 struct ScopedSet {
     sys::Monitor& lock;

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Message.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Message.h?rev=1071615&r1=1071614&r2=1071615&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Message.h (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Message.h Thu Feb 17 14:08:14 2011
@@ -153,8 +153,6 @@ public:
        void forcePersistent();
        bool isForcedPersistent();
     
-    boost::intrusive_ptr<Message>& getReplacementMessage(const Queue* qfor) const;
-    void setReplacementMessage(boost::intrusive_ptr<Message> msg, const Queue* qfor);
 
     /** Call cb when dequeue is complete, may call immediately. Holds cb by reference. */
     void setDequeueCompleteCallback(MessageCallback& cb);
@@ -163,8 +161,6 @@ public:
     uint8_t getPriority() const;
 
   private:
-    typedef std::map<const Queue*,boost::intrusive_ptr<Message> > Replacement;
-
     MessageAdapter& getAdapter() const;
     void allDequeuesComplete();
 
@@ -183,7 +179,6 @@ public:
 
     static TransferAdapter TRANSFER;
 
-    mutable Replacement replacement;
     mutable boost::intrusive_ptr<Message> empty;
 
     sys::Monitor callbackLock;

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1071615&r1=1071614&r2=1071615&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.cpp Thu Feb 17 14:08:14 2011
@@ -23,11 +23,16 @@
 #include "qpid/broker/Queue.h"
 #include "qpid/broker/QueueEvents.h"
 #include "qpid/broker/Exchange.h"
+#include "qpid/broker/Fairshare.h"
 #include "qpid/broker/DeliverableMessage.h"
+#include "qpid/broker/LegacyLVQ.h"
+#include "qpid/broker/MessageDeque.h"
+#include "qpid/broker/MessageMap.h"
 #include "qpid/broker/MessageStore.h"
 #include "qpid/broker/NullMessageStore.h"
 #include "qpid/broker/QueueRegistry.h"
 #include "qpid/broker/QueueFlowLimit.h"
+#include "qpid/broker/ThresholdAlerts.h"
 
 #include "qpid/StringUtils.h"
 #include "qpid/log/Statement.h"
@@ -67,11 +72,13 @@ const std::string qpidMaxCount("qpid.max
 const std::string qpidNoLocal("no-local");
 const std::string qpidTraceIdentity("qpid.trace.id");
 const std::string qpidTraceExclude("qpid.trace.exclude");
+const std::string qpidLastValueQueueKey("qpid.last_value_queue_key");
 const std::string qpidLastValueQueue("qpid.last_value_queue");
 const std::string qpidLastValueQueueNoBrowse("qpid.last_value_queue_no_browse");
 const std::string qpidPersistLastNode("qpid.persist_last_node");
 const std::string qpidVQMatchProperty("qpid.LVQ_key");
 const std::string qpidQueueEventGeneration("qpid.queue_event_generation");
+const std::string qpidAutoDeleteTimeout("qpid.auto_delete_timeout");
 //following feature is not ready for general use as it doesn't handle
 //the case where a message is enqueued on more than one queue well enough:
 const std::string qpidInsertSequenceNumbers("qpid.insert_sequence_numbers");
@@ -93,19 +100,18 @@ Queue::Queue(const string& _name, bool _
     consumerCount(0),
     exclusive(0),
     noLocal(false),
-    lastValueQueue(false),
-    lastValueQueueNoBrowse(false),
     persistLastNode(false),
     inLastNodeFailure(false),
+    messages(new MessageDeque()),
     persistenceId(0),
     policyExceeded(false),
     mgmtObject(0),
     eventMode(0),
-    eventMgr(0),
     insertSeqNo(0),
     broker(b),
     deleted(false),
-    barrier(*this)
+    barrier(*this),
+    autoDeleteTimeout(0)
 {
     if (parent != 0 && broker != 0) {
         ManagementAgent* agent = broker->getManagementAgent();
@@ -160,7 +166,6 @@ void Queue::deliver(boost::intrusive_ptr
     } else {
         enqueue(0, msg);
         push(msg);
-        mgntEnqStats(msg);
         QPID_LOG(debug, "Message " << msg << " enqueued on " << name);
     }
 }
@@ -179,7 +184,6 @@ void Queue::recover(boost::intrusive_ptr
         msg->addToSyncList(shared_from_this(), store); 
     }
     msg->enqueueComplete(); // mark the message as enqueued
-    mgntEnqStats(msg);
 
     if (store && (!msg->isContentLoaded() || msg->checkContentReleasable())) {
         //content has not been loaded, need to ensure that lazy loading mode is set:
@@ -194,7 +198,6 @@ void Queue::recover(boost::intrusive_ptr
 
 void Queue::process(boost::intrusive_ptr<Message>& msg){
     push(msg);
-    mgntEnqStats(msg);
     if (mgmtObject != 0){
         mgmtObject->inc_msgTxnEnqueues ();
         mgmtObject->inc_byteTxnEnqueues (msg->contentSize ());
@@ -208,7 +211,7 @@ void Queue::requeue(const QueuedMessage&
         Mutex::ScopedLock locker(messageLock);
         if (!isEnqueued(msg)) return;
         msg.payload->enqueueComplete(); // mark the message as enqueued
-        messages.insert(lower_bound(messages.begin(), messages.end(), msg), msg);
+        messages->reinsert(msg);
         listeners.populate(copy);
 
         // for persistLastNode - don't force a message twice to disk, but force it if no force before 
@@ -223,57 +226,23 @@ void Queue::requeue(const QueuedMessage&
     copy.notify();
 }
 
-void Queue::clearLVQIndex(const QueuedMessage& msg){
-    assertClusterSafe();
-    const framing::FieldTable* ft = msg.payload ? msg.payload->getApplicationHeaders() : 0;
-    if (lastValueQueue && ft){
-        string key = ft->getAsString(qpidVQMatchProperty);
-        lvq.erase(key);
-    }
-}
-
 bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message) 
 {
     Mutex::ScopedLock locker(messageLock);
     assertClusterSafe();
     QPID_LOG(debug, "Attempting to acquire message at " << position);
-    
-    Messages::iterator i = findAt(position); 
-    if (i != messages.end() ) {
-        message = *i;
-        if (lastValueQueue) {
-            clearLVQIndex(*i);
-        }
-        QPID_LOG(debug,
-                 "Acquired message at " << i->position << " from " << name);
-        messages.erase(i);
+    if (messages->remove(position, message)) {
+        QPID_LOG(debug, "Acquired message at " << position << " from " << name);
         return true;
-    } 
-    QPID_LOG(debug, "Could not acquire message at " << position << " from " << name << "; no message at that position");
-    return false;
+    } else {
+        QPID_LOG(debug, "Could not acquire message at " << position << " from " << name << "; no message at that position");
+        return false;
+    }
 }
 
 bool Queue::acquire(const QueuedMessage& msg) {
-    Mutex::ScopedLock locker(messageLock);
-    assertClusterSafe();
-
-    QPID_LOG(debug, "attempting to acquire " << msg.position);
-    Messages::iterator i = findAt(msg.position); 
-    if ((i != messages.end() && i->position == msg.position) && // note that in some cases payload not be set
-        (!lastValueQueue ||
-        (lastValueQueue && msg.payload.get() == checkLvqReplace(*i).payload.get()) ) // note this is safe for no payload set 0==0
-        )  {
-
-        clearLVQIndex(msg);
-        QPID_LOG(debug,
-                 "Match found, acquire succeeded: " <<
-                 i->position << " == " << msg.position);
-        messages.erase(i);
-        return true;
-    } 
-    
-    QPID_LOG(debug, "Acquire failed for " << msg.position);
-    return false;
+    QueuedMessage copy = msg;
+    return acquireMessageAt(msg.position, copy);
 }
 
 void Queue::notifyListener()
@@ -282,7 +251,7 @@ void Queue::notifyListener()
     QueueListeners::NotificationSet set;
     {
         Mutex::ScopedLock locker(messageLock);
-        if (messages.size()) {
+        if (messages->size()) {
             listeners.populate(set);
         }
     }
@@ -311,12 +280,12 @@ Queue::ConsumeCode Queue::consumeNextMes
 {
     while (true) {
         Mutex::ScopedLock locker(messageLock);
-        if (messages.empty()) { 
+        if (messages->empty()) { 
             QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
             listeners.addListener(c);
             return NO_MESSAGES;
         } else {
-            QueuedMessage msg = getFront();
+            QueuedMessage msg = messages->front();
             if (msg.payload->hasExpired()) {
                 QPID_LOG(debug, "Message expired from queue '" << name << "'");
                 popAndDequeue();
@@ -326,7 +295,7 @@ Queue::ConsumeCode Queue::consumeNextMes
             if (c->filter(msg.payload)) {
                 if (c->accept(msg.payload)) {            
                     m = msg;
-                    popMsg(msg);
+                    pop();
                     return CONSUMED;
                 } else {
                     //message(s) are available but consumer hasn't got enough credit
@@ -352,11 +321,6 @@ bool Queue::browseNextMessage(QueuedMess
                 //consumer wants the message
                 c->position = msg.position;
                 m = msg;
-                if (!lastValueQueueNoBrowse) clearLVQIndex(msg);
-                if (lastValueQueue) {
-                    boost::intrusive_ptr<Message> replacement = msg.payload->getReplacementMessage(this);
-                    if (replacement.get()) m.payload = replacement;
-                }
                 return true;
             } else {
                 //browser hasn't got enough credit for the message
@@ -378,7 +342,7 @@ void Queue::removeListener(Consumer::sha
     {
         Mutex::ScopedLock locker(messageLock);
         listeners.removeListener(c);
-        if (messages.size()) {
+        if (messages->size()) {
             listeners.populate(set);
         }
     }
@@ -399,52 +363,20 @@ bool Queue::dispatch(Consumer::shared_pt
 // Find the next message 
 bool Queue::seek(QueuedMessage& msg, Consumer::shared_ptr c) {
     Mutex::ScopedLock locker(messageLock);
-    if (!messages.empty() && messages.back().position > c->position) {
-        if (c->position < getFront().position) {
-            msg = getFront();
-            return true;
-        } else {        
-            Messages::iterator pos = findAt(c->position);
-            if (pos != messages.end() && pos+1 != messages.end()) {
-                msg = *(pos+1);
-                return true;
-            }
-        }
+    if (messages->next(c->position, msg)) {
+        return true;
+    } else {
+        listeners.addListener(c);
+        return false;
     }
-    listeners.addListener(c);
-    return false;
-}
-
-Queue::Messages::iterator Queue::findAt(SequenceNumber pos) {
-
-    if(!messages.empty()){
-        QueuedMessage compM;
-        compM.position = pos;
-        unsigned long diff = pos.getValue() - messages.front().position.getValue();
-        long maxEnd = diff < messages.size()? diff : messages.size();
-
-        Messages::iterator i = lower_bound(messages.begin(),messages.begin()+maxEnd,compM); 
-        if (i!= messages.end() && i->position == pos)
-            return i;
-    }    
-    return messages.end(); // no match found.
 }
 
-
 QueuedMessage Queue::find(SequenceNumber pos) const {
 
     Mutex::ScopedLock locker(messageLock);
-    if(!messages.empty()){
-        QueuedMessage compM;
-        compM.position = pos;
-        unsigned long diff = pos.getValue() - messages.front().position.getValue();
-        long maxEnd = diff < messages.size()? diff : messages.size();
-
-        Messages::const_iterator i = lower_bound(messages.begin(),messages.begin()+maxEnd,compM); 
-        if (i != messages.end())
-            return *i;
-    }
-    return QueuedMessage();
+    QueuedMessage msg;
+    messages->find(pos, msg);
+    return msg;
 }
 
 void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){
@@ -464,6 +396,10 @@ void Queue::consume(Consumer::shared_ptr
     consumerCount++;
     if (mgmtObject != 0)
         mgmtObject->inc_consumerCount ();
+    //reset auto deletion timer if necessary
+    if (autoDeleteTimeout && autoDeleteTask) {
+        autoDeleteTask->cancel();
+    }
 }
 
 void Queue::cancel(Consumer::shared_ptr c){
@@ -478,12 +414,18 @@ void Queue::cancel(Consumer::shared_ptr 
 QueuedMessage Queue::get(){
     Mutex::ScopedLock locker(messageLock);
     QueuedMessage msg(this);
+    messages->pop(msg);
+    return msg;
+}
 
-    if(!messages.empty()){
-        msg = getFront();
-        popMsg(msg);
+bool collect_if_expired(std::deque<QueuedMessage>& expired, QueuedMessage& message)
+{
+    if (message.payload->hasExpired()) {
+        expired.push_back(message);
+        return true;
+    } else {
+        return false;
     }
-    return msg;
 }
 
 void Queue::purgeExpired()
@@ -492,37 +434,11 @@ void Queue::purgeExpired()
     //bother explicitly expiring if the rate of dequeues since last
     //attempt is less than one per second.  
 
-    //Note: This method is currently called periodically on the timer
-    //thread. In a clustered broker this means that the purging does
-    //not occur on the cluster event dispatch thread and consequently
-    //that is not totally ordered w.r.t other events (including
-    //publication of messages). However the cluster does ensure that
-    //the actual expiration of messages (as distinct from the removing
-    //of those expired messages from the queue) *is* consistently
-    //ordered w.r.t. cluster events. This means that delivery of
-    //messages is in general consistent across the cluster inspite of
-    //any non-determinism in the triggering of a purge. However at
-    //present purging a last value queue could potentially cause
-    //inconsistencies in the cluster (as the order w.r.t publications
-    //can affect the order in which messages appear in the
-    //queue). Consequently periodic purging of an LVQ is not enabled
-    //(expired messages will be removed on delivery and consolidated
-    //by key as part of normal LVQ operation).
-
-    if (dequeueTracker.sampleRatePerSecond() < 1 && !lastValueQueue) {
-        Messages expired;
+    if (dequeueTracker.sampleRatePerSecond() < 1) {
+        std::deque<QueuedMessage> expired;
         {
             Mutex::ScopedLock locker(messageLock);
-            for (Messages::iterator i = messages.begin(); i != messages.end();) {
-                //Re-introduce management of LVQ-specific state here
-                //if purging is renabled for that case (see note above)
-                if (i->payload->hasExpired()) {
-                    expired.push_back(*i);
-                    i = messages.erase(i);
-                } else {
-                    ++i;
-                }
-            }
+            messages->removeIf(boost::bind(&collect_if_expired, expired, _1));
         }
         for_each(expired.begin(), expired.end(), bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
     }
@@ -548,13 +464,13 @@ uint32_t Queue::purge(const uint32_t pur
 
     uint32_t count = 0;
     // Either purge them all or just the some (purge_count) while the queue isn't empty.
-    while((!purge_request || purge_count--) && !messages.empty()) {
+    while((!purge_request || purge_count--) && !messages->empty()) {
         if (dest.get()) {
             //
             // If there is a destination exchange, stage the messages onto a reroute queue
             // so they don't wind up getting purged more than once.
             //
-            DeliverableMessage msg(getFront().payload);
+            DeliverableMessage msg(messages->front().payload);
             rerouteQueue.push_back(msg);
         }
         popAndDequeue();
@@ -580,101 +496,53 @@ uint32_t Queue::move(const Queue::shared
     uint32_t move_count = qty; // only comes into play if  qty >0 
     uint32_t count = 0; // count how many were moved for returning
 
-    while((!qty || move_count--) && !messages.empty()) {
-        QueuedMessage qmsg = getFront();
+    while((!qty || move_count--) && !messages->empty()) {
+        QueuedMessage qmsg = messages->front();
         boost::intrusive_ptr<Message> msg = qmsg.payload;
         destq->deliver(msg); // deliver message to the destination queue
-        popMsg(qmsg);
+        pop();
         dequeue(0, qmsg);
         count++;
     }
     return count;
 }
 
-void Queue::popMsg(QueuedMessage& qmsg)
+void Queue::pop()
 {
     assertClusterSafe();
-    const framing::FieldTable* ft = qmsg.payload->getApplicationHeaders();
-    if (lastValueQueue && ft){
-        string key = ft->getAsString(qpidVQMatchProperty);
-        lvq.erase(key);
-    }
-    messages.pop_front();
+    messages->pop();
     ++dequeueTracker;
 }
 
 void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
     assertClusterSafe();
     QueueListeners::NotificationSet copy;
+    QueuedMessage removed;
+    bool dequeueRequired = false;
     {
         Mutex::ScopedLock locker(messageLock);   
         QueuedMessage qm(this, msg, ++sequence);
         if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence);
          
-        LVQ::iterator i;
-        const framing::FieldTable* ft = msg->getApplicationHeaders();
-        if (lastValueQueue && ft){
-            string key = ft->getAsString(qpidVQMatchProperty);
-
-            i = lvq.find(key);
-            if (i == lvq.end() || (broker && broker->isClusterUpdatee())) {
-                messages.push_back(qm);
-                listeners.populate(copy);
-                lvq[key] = msg; 
-            }else {
-                boost::intrusive_ptr<Message> old = i->second->getReplacementMessage(this);
-                if (!old) old = i->second;
-                i->second->setReplacementMessage(msg,this);
-                if (isRecovery) {
-                    //can't issue new requests for the store until
-                    //recovery is complete
-                    pendingDequeues.push_back(QueuedMessage(qm.queue, old, qm.position));
-                } else {
-                    Mutex::ScopedUnlock u(messageLock);   
-                    dequeue(0, QueuedMessage(qm.queue, old, qm.position));
-                }
-            }           
-        }else {
-            messages.push_back(qm);
-            listeners.populate(copy);
-        }
-        if (eventMode) {
-            if (eventMgr) eventMgr->enqueued(qm);
-            else QPID_LOG(warning, "Enqueue manager not set, events not generated for " << getName());
-        }
-        if (policy.get()) {
-            policy->enqueued(qm);
-        }
-        if (flowLimit.get())
-            flowLimit->enqueued(qm);
+        dequeueRequired = messages->push(qm, removed);
+        listeners.populate(copy);
+        enqueued(qm);
     }
     copy.notify();
-}
-
-QueuedMessage Queue::getFront()
-{
-    QueuedMessage msg = messages.front();
-    if (lastValueQueue) {
-        boost::intrusive_ptr<Message> replacement = msg.payload->getReplacementMessage(this);
-        if (replacement.get()) msg.payload = replacement;
+    if (dequeueRequired) {
+        if (isRecovery) {
+            //can't issue new requests for the store until
+            //recovery is complete
+            pendingDequeues.push_back(removed);
+        } else {
+            dequeue(0, removed);
+        }
     }
-    return msg;
 }
 
-QueuedMessage& Queue::checkLvqReplace(QueuedMessage& msg)
+void isEnqueueComplete(uint32_t* result, const QueuedMessage& message)
 {
-    boost::intrusive_ptr<Message> replacement = msg.payload->getReplacementMessage(this);
-    if (replacement.get()) {
-        const framing::FieldTable* ft = replacement->getApplicationHeaders();
-        if (ft) {
-            string key = ft->getAsString(qpidVQMatchProperty);
-            if (lvq.find(key) != lvq.end()){
-                lvq[key] = replacement; 
-            }        
-        }
-        msg.payload = replacement;
-    }
-    return msg;
+    if (message.payload->isIngressComplete()) (*result)++;
 }
 
 /** function only provided for unit tests, or code not in critical message path */
@@ -682,20 +550,14 @@ uint32_t Queue::getEnqueueCompleteMessag
 {
     Mutex::ScopedLock locker(messageLock);
     uint32_t count = 0;
-    for ( Messages::const_iterator i = messages.begin(); i != messages.end(); ++i ) {
-        //NOTE: don't need to use checkLvqReplace() here as it
-        //is only relevant for LVQ which does not support persistence
-        //so the enqueueComplete check has no effect
-        if ( i->payload->isIngressComplete() ) count ++;
-    }
-    
+    messages->foreach(boost::bind(&isEnqueueComplete, &count, _1));
     return count;
 }
 
 uint32_t Queue::getMessageCount() const
 {
     Mutex::ScopedLock locker(messageLock);
-    return messages.size();
+    return messages->size();
 }
 
 uint32_t Queue::getConsumerCount() const
@@ -707,7 +569,7 @@ uint32_t Queue::getConsumerCount() const
 bool Queue::canAutoDelete() const
 {
     Mutex::ScopedLock locker(consumerLock);
-    return autodelete && !consumerCount;
+    return autodelete && !consumerCount && !owner;
 }
 
 void Queue::clearLastNodeFailure()
@@ -715,21 +577,22 @@ void Queue::clearLastNodeFailure()
     inLastNodeFailure = false;
 }
 
+void Queue::forcePersistent(QueuedMessage& message)
+{
+    if(!message.payload->isStoredOnQueue(shared_from_this())) {
+        message.payload->forcePersistent();
+        if (message.payload->isForcedPersistent() ){
+            enqueue(0, message.payload);
+        }
+    }
+}
+
 void Queue::setLastNodeFailure()
 {
     if (persistLastNode){
         Mutex::ScopedLock locker(messageLock);
         try {
-    	    for ( Messages::iterator i = messages.begin(); i != messages.end(); ++i ) {
-                if (lastValueQueue) checkLvqReplace(*i);
-                // don't force a message twice to disk.
-                if(!i->payload->isStoredOnQueue(shared_from_this())) {
-                    i->payload->forcePersistent();
-                    if (i->payload->isForcedPersistent() ){
-            	        enqueue(0, i->payload);
-                    }
-                }
-    	    }
+            messages->foreach(boost::bind(&Queue::forcePersistent, this, _1));
         } catch (const std::exception& e) {
             // Could not go into last node standing (for example journal not large enough)
             QPID_LOG(error, "Unable to fail to last node standing for queue: " << name << " : " << e.what());
@@ -746,7 +609,7 @@ bool Queue::enqueue(TransactionContext* 
     if (!u.acquired) return false;
 
     if (policy.get() && !suppressPolicyCheck) {
-        Messages dequeues;
+        std::deque<QueuedMessage> dequeues;
         {
             Mutex::ScopedLock locker(messageLock);
             policy->tryEnqueue(msg);
@@ -833,8 +696,8 @@ void Queue::dequeueCommitted(const Queue
  */
 void Queue::popAndDequeue()
 {
-    QueuedMessage msg = getFront();
-    popMsg(msg);
+    QueuedMessage msg = messages->front();
+    pop();
     dequeue(0, msg);
 }
 
@@ -845,11 +708,16 @@ void Queue::popAndDequeue()
 void Queue::dequeued(const QueuedMessage& msg)
 {
     if (policy.get()) policy->dequeued(msg);
+    /** todo KAG make flowLimit an observer */
     if (flowLimit.get())
         flowLimit->dequeued(msg);
     mgntDeqStats(msg.payload);
-    if (eventMode == ENQUEUE_AND_DEQUEUE && eventMgr) {
-        eventMgr->dequeued(msg);
+    for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
+        try{
+            (*i)->dequeued(msg);
+        } catch (const std::exception& e) {
+            QPID_LOG(warning, "Exception on notification of dequeue for queue " << getName() << ": " << e.what());
+        }
     }
 }
 
@@ -863,16 +731,41 @@ void Queue::create(const FieldTable& _se
     configure(_settings);
 }
 
+
+int getIntegerSetting(const qpid::framing::FieldTable& settings, const std::string& key)
+{
+    qpid::framing::FieldTable::ValuePtr v = settings.get(key);
+    if (!v) {
+        return 0;
+    } else if (v->convertsTo<int>()) {
+        return v->get<int>();
+    } else if (v->convertsTo<std::string>()){
+        std::string s = v->get<std::string>();
+        try { 
+            return boost::lexical_cast<int>(s); 
+        } catch(const boost::bad_lexical_cast&) {
+            QPID_LOG(warning, "Ignoring invalid integer value for " << key << ": " << s);
+            return 0;
+        }
+    } else {
+        QPID_LOG(warning, "Ignoring invalid integer value for " << key << ": " << *v);
+        return 0;
+    }
+}
+
 void Queue::configure(const FieldTable& _settings, bool recovering)
 {
 
     eventMode = _settings.getAsInt(qpidQueueEventGeneration);
+    if (eventMode && broker) {
+        broker->getQueueEvents().observe(*this, eventMode == ENQUEUE_ONLY);
+    }
 
     if (QueuePolicy::getType(_settings) == QueuePolicy::FLOW_TO_DISK && 
-        (!store || NullMessageStore::isNullStore(store) || (eventMode && eventMgr && !eventMgr->isSync()) )) {
+        (!store || NullMessageStore::isNullStore(store) || (broker && !(broker->getQueueEvents().isSync())) )) {
         if ( NullMessageStore::isNullStore(store)) {
             QPID_LOG(warning, "Flow to disk not valid for non-persisted queue:" << getName());
-        } else if (eventMgr && !eventMgr->isSync() ) {
+        } else if (broker && !(broker->getQueueEvents().isSync()) ) {
             QPID_LOG(warning, "Flow to disk not valid with async Queue Events:" << getName());
         }
         FieldTable copy(_settings);
@@ -881,17 +774,30 @@ void Queue::configure(const FieldTable& 
     } else {
         setPolicy(QueuePolicy::createQueuePolicy(getName(), _settings));
     }
+    if (broker && broker->getManagementAgent()) {
+        ThresholdAlerts::observe(*this, *(broker->getManagementAgent()), _settings);
+    }
+
     //set this regardless of owner to allow use of no-local with exclusive consumers also
     noLocal = _settings.get(qpidNoLocal);
     QPID_LOG(debug, "Configured queue " << getName() << " with no-local=" << noLocal);
 
-    lastValueQueue= _settings.get(qpidLastValueQueue);
-    if (lastValueQueue) QPID_LOG(debug, "Configured queue as Last Value Queue for: " << getName());
-
-    lastValueQueueNoBrowse = _settings.get(qpidLastValueQueueNoBrowse);
-    if (lastValueQueueNoBrowse){
-        QPID_LOG(debug, "Configured queue as Last Value Queue No Browse for: " << getName());
-        lastValueQueue = lastValueQueueNoBrowse;
+    std::string lvqKey = _settings.getAsString(qpidLastValueQueueKey);
+    if (lvqKey.size()) {
+        QPID_LOG(debug, "Configured queue " <<  getName() << " as Last Value Queue with key " << lvqKey);
+        messages = std::auto_ptr<Messages>(new MessageMap(lvqKey));
+    } else if (_settings.get(qpidLastValueQueueNoBrowse)) {
+        QPID_LOG(debug, "Configured queue " <<  getName() << " as Legacy Last Value Queue with 'no-browse' on");
+        messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, true, broker);
+    } else if (_settings.get(qpidLastValueQueue)) {
+        QPID_LOG(debug, "Configured queue " <<  getName() << " as Legacy Last Value Queue");
+        messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, false, broker);
+    } else {
+        std::auto_ptr<Messages> m = Fairshare::create(_settings);
+        if (m.get()) {
+            messages = m;
+            QPID_LOG(debug, "Configured queue " <<  getName() << " as priority queue.");
+        }
     }
     
     persistLastNode= _settings.get(qpidPersistLastNode);
@@ -910,6 +816,10 @@ void Queue::configure(const FieldTable& 
 
     flowLimit = QueueFlowLimit::createQueueFlowLimit(this, _settings);
 
+    autoDeleteTimeout = getIntegerSetting(_settings, qpidAutoDeleteTimeout);
+    if (autoDeleteTimeout) 
+        QPID_LOG(debug, "Configured queue " << getName() << " with qpid.auto_delete_timeout=" << autoDeleteTimeout); 
+
     if (mgmtObject != 0) {
         mgmtObject->set_arguments(ManagementAgent::toMap(_settings));
         if (flowLimit.get())
@@ -924,8 +834,8 @@ void Queue::destroy()
 {
     if (alternateExchange.get()) {
         Mutex::ScopedLock locker(messageLock);
-        while(!messages.empty()){
-            DeliverableMessage msg(getFront().payload);
+        while(!messages->empty()){
+            DeliverableMessage msg(messages->front().payload);
             alternateExchange->route(msg, msg.getMessage().getRoutingKey(),
                                      msg.getMessage().getApplicationHeaders());
             popAndDequeue();
@@ -939,6 +849,7 @@ void Queue::destroy()
         store->destroy(*this);
         store = 0;//ensure we make no more calls to the store for this queue
     }
+    if (autoDeleteTask) autoDeleteTask = boost::intrusive_ptr<TimerTask>();
 }
 
 void Queue::notifyDeleted()
@@ -1043,15 +954,46 @@ boost::shared_ptr<Exchange> Queue::getAl
     return alternateExchange;
 }
 
-void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue)
+void tryAutoDeleteImpl(Broker& broker, Queue::shared_ptr queue)
 {
     if (broker.getQueues().destroyIf(queue->getName(), 
                                      boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) {
+        QPID_LOG(debug, "Auto-deleting " << queue->getName());
         queue->unbind(broker.getExchanges(), queue);
         queue->destroy();
     }
 }
 
+struct AutoDeleteTask : qpid::sys::TimerTask
+{
+    Broker& broker;
+    Queue::shared_ptr queue;
+
+    AutoDeleteTask(Broker& b, Queue::shared_ptr q, AbsTime fireTime) 
+        : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion"), broker(b), queue(q) {}
+
+    void fire()
+    {
+        //need to detect case where queue was used after the task was
+        //created, but then became unused again before the task fired;
+        //in this case ignore this request as there will have already
+        //been a later task added
+        tryAutoDeleteImpl(broker, queue);
+    }
+};
+
+void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue)
+{
+    if (queue->autoDeleteTimeout && queue->canAutoDelete()) {
+        AbsTime time(now(), Duration(queue->autoDeleteTimeout * TIME_SEC));
+        queue->autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new AutoDeleteTask(broker, queue, time));
+        broker.getClusterTimer().add(queue->autoDeleteTask);        
+        QPID_LOG(debug, "Timed auto-delete for " << queue->getName() << " initiated");
+    } else {
+        tryAutoDeleteImpl(broker, queue);
+    }
+}
+
 bool Queue::isExclusiveOwner(const OwnershipToken* const o) const 
 { 
     Mutex::ScopedLock locker(ownershipLock);
@@ -1066,6 +1008,10 @@ void Queue::releaseExclusiveOwnership() 
 
 bool Queue::setExclusiveOwner(const OwnershipToken* const o) 
 { 
+    //reset auto deletion timer if necessary
+    if (autoDeleteTimeout && autoDeleteTask) {
+        autoDeleteTask->cancel();
+    }
     Mutex::ScopedLock locker(ownershipLock);
     if (owner) {
         return false;
@@ -1154,11 +1100,6 @@ SequenceNumber Queue::getPosition() {
 
 int Queue::getEventMode() { return eventMode; }
 
-void Queue::setQueueEventManager(QueueEvents& mgr)
-{
-    eventMgr = &mgr;
-}
-
 void Queue::recoveryComplete(ExchangeRegistry& exchanges)
 {
     // set the alternate exchange
@@ -1184,16 +1125,31 @@ void Queue::insertSequenceNumbers(const 
 
 void Queue::enqueued(const QueuedMessage& m)
 {
-    if (m.payload) {
-        if (policy.get()) {
-            policy->recoverEnqueued(m.payload);
-            policy->enqueued(m);
+    for (Observers::iterator i = observers.begin(); i != observers.end(); ++i) {
+        try {
+            (*i)->enqueued(m);
+        } catch (const std::exception& e) {
+            QPID_LOG(warning, "Exception on notification of enqueue for queue " << getName() << ": " << e.what());
         }
-        if (flowLimit.get())
-            flowLimit->enqueued(m);
-        mgntEnqStats(m.payload);
+    }
+    if (policy.get()) {
+        policy->enqueued(m);
+    }
+    /** todo make flowlimit an observer */
+    if (flowLimit.get())
+        flowLimit->enqueued(m);
+    mgntEnqStats(m.payload);
+}
+
+void Queue::updateEnqueued(const QueuedMessage& m)
+{
+    if (m.payload) {
         boost::intrusive_ptr<Message> payload = m.payload;
         enqueue ( 0, payload, true );
+        if (policy.get()) {
+            policy->recoverEnqueued(payload);
+        }
+        enqueued(m);
     } else {
         QPID_LOG(warning, "Queue informed of enqueued message that has no payload");
     }
@@ -1205,6 +1161,8 @@ bool Queue::isEnqueued(const QueuedMessa
 }
 
 QueueListeners& Queue::getListeners() { return listeners; }
+Messages& Queue::getMessages() { return *messages; }
+const Messages& Queue::getMessages() const { return *messages; }
 
 void Queue::checkNotDeleted()
 {
@@ -1213,6 +1171,11 @@ void Queue::checkNotDeleted()
     }
 }
 
+void Queue::addObserver(boost::shared_ptr<QueueObserver> observer)
+{
+    observers.insert(observer);
+}
+
 void Queue::flush()
 {
     ScopedUse u(barrier);

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.h?rev=1071615&r1=1071614&r2=1071615&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.h Thu Feb 17 14:08:14 2011
@@ -26,14 +26,17 @@
 #include "qpid/broker/OwnershipToken.h"
 #include "qpid/broker/Consumer.h"
 #include "qpid/broker/Message.h"
+#include "qpid/broker/Messages.h"
 #include "qpid/broker/PersistableQueue.h"
 #include "qpid/broker/QueuePolicy.h"
 #include "qpid/broker/QueueBindings.h"
 #include "qpid/broker/QueueListeners.h"
+#include "qpid/broker/QueueObserver.h"
 #include "qpid/broker/RateTracker.h"
 
 #include "qpid/framing/FieldTable.h"
 #include "qpid/sys/Monitor.h"
+#include "qpid/sys/Timer.h"
 #include "qpid/management/Manageable.h"
 #include "qmf/org/apache/qpid/broker/Queue.h"
 #include "qpid/framing/amqp_types.h"
@@ -46,6 +49,7 @@
 #include <vector>
 #include <memory>
 #include <deque>
+#include <set>
 #include <algorithm>
 
 namespace qpid {
@@ -86,10 +90,10 @@ class Queue : public boost::enable_share
         ~ScopedUse() { if (acquired) barrier.release(); }
     };
             
-    typedef std::deque<QueuedMessage> Messages;
-    typedef std::map<std::string,boost::intrusive_ptr<Message> > LVQ;
+    typedef std::set< boost::shared_ptr<QueueObserver> > Observers;
     enum ConsumeCode {NO_MESSAGES=0, CANT_CONSUME=1, CONSUMED=2};
 
+
     const std::string name;
     const bool autodelete;
     MessageStore* store;
@@ -97,16 +101,13 @@ class Queue : public boost::enable_share
     uint32_t consumerCount;
     OwnershipToken* exclusive;
     bool noLocal;
-    bool lastValueQueue;
-    bool lastValueQueueNoBrowse;
     bool persistLastNode;
     bool inLastNodeFailure;
     std::string traceId;
     std::vector<std::string> traceExclude;
     QueueListeners listeners;
-    Messages messages;
-    Messages pendingDequeues;//used to avoid dequeuing during recovery
-    LVQ lvq;
+    std::auto_ptr<Messages> messages;
+    std::deque<QueuedMessage> pendingDequeues;//used to avoid dequeuing during recovery
     mutable qpid::sys::Mutex consumerLock;
     mutable qpid::sys::Monitor messageLock;
     mutable qpid::sys::Mutex ownershipLock;
@@ -122,12 +123,14 @@ class Queue : public boost::enable_share
     qmf::org::apache::qpid::broker::Queue* mgmtObject;
     RateTracker dequeueTracker;
     int eventMode;
-    QueueEvents* eventMgr;
+    Observers observers;
     bool insertSeqNo;
     std::string seqNoKey;
     Broker* broker;
     bool deleted;
     UsageBarrier barrier;
+    int autoDeleteTimeout;
+    boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask;
 
     void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false);
     void setPolicy(std::auto_ptr<QueuePolicy> policy);
@@ -141,12 +144,13 @@ class Queue : public boost::enable_share
 
     bool isExcluded(boost::intrusive_ptr<Message>& msg);
 
+    void enqueued(const QueuedMessage& msg);
     void dequeued(const QueuedMessage& msg);
-    void popMsg(QueuedMessage& qmsg);
+    void pop();
     void popAndDequeue();
     QueuedMessage getFront();
-    QueuedMessage& checkLvqReplace(QueuedMessage& msg);
-    void clearLVQIndex(const QueuedMessage& msg);
+    void forcePersistent(QueuedMessage& msg);
+    int getEventMode();
 
     inline void mgntEnqStats(const boost::intrusive_ptr<Message>& msg)
     {
@@ -171,7 +175,6 @@ class Queue : public boost::enable_share
         }
     }
             
-    Messages::iterator findAt(framing::SequenceNumber pos);
     void checkNotDeleted();
 
   public:
@@ -277,7 +280,7 @@ class Queue : public boost::enable_share
      * thus are still logically on the queue) - used in
      * clustered broker.  
      */ 
-    void enqueued(const QueuedMessage& msg);
+    void updateEnqueued(const QueuedMessage& msg);
 
     /**
      * Test whether the specified message (identified by its
@@ -322,13 +325,7 @@ class Queue : public boost::enable_share
     /** Apply f to each Message on the queue. */
     template <class F> void eachMessage(F f) {
         sys::Mutex::ScopedLock l(messageLock);
-        if (lastValueQueue) {
-            for (Messages::iterator i = messages.begin(); i != messages.end(); ++i) {
-                f(checkLvqReplace(*i));
-            }
-        } else {
-            std::for_each(messages.begin(), messages.end(), f);
-        }
+        messages->foreach(f);
     }
 
     /** Apply f to each QueueBinding on the queue */
@@ -344,8 +341,7 @@ class Queue : public boost::enable_share
     /** return current position sequence number for the next message on the queue.
      */
     QPID_BROKER_EXTERN framing::SequenceNumber getPosition();
-    int getEventMode();
-    void setQueueEventManager(QueueEvents&);
+    void addObserver(boost::shared_ptr<QueueObserver>);
     QPID_BROKER_EXTERN void insertSequenceNumbers(const std::string& key);
     /**
      * Notify queue that recovery has completed.
@@ -354,6 +350,8 @@ class Queue : public boost::enable_share
 
     // For cluster update
     QueueListeners& getListeners();
+    Messages& getMessages();
+    const Messages& getMessages() const;
 
     /**
      * Reserve space in policy for an enqueued message that

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueEvents.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueEvents.cpp?rev=1071615&r1=1071614&r2=1071615&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueEvents.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueEvents.cpp Thu Feb 17 14:08:14 2011
@@ -19,6 +19,8 @@
  *
  */
 #include "qpid/broker/QueueEvents.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/QueueObserver.h"
 #include "qpid/Exception.h"
 #include "qpid/log/Statement.h"
 
@@ -115,6 +117,29 @@ bool QueueEvents::isSync()
     return sync;
 }
 
+class EventGenerator : public QueueObserver
+{
+  public:
+    EventGenerator(QueueEvents& mgr, bool enqOnly) : manager(mgr), enqueueOnly(enqOnly) {}
+    void enqueued(const QueuedMessage& m)
+    {
+        manager.enqueued(m);
+    }
+    void dequeued(const QueuedMessage& m)
+    {
+        if (!enqueueOnly) manager.dequeued(m);
+    }
+  private:
+    QueueEvents& manager;
+    const bool enqueueOnly;
+};
+
+void QueueEvents::observe(Queue& queue, bool enqueueOnly)
+{
+    boost::shared_ptr<QueueObserver> observer(new EventGenerator(*this, enqueueOnly));
+    queue.addObserver(observer);
+}
+
 
 QueueEvents::Event::Event(EventType t, const QueuedMessage& m) : type(t), msg(m) {}
 

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueEvents.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueEvents.h?rev=1071615&r1=1071614&r2=1071615&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueEvents.h (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueEvents.h Thu Feb 17 14:08:14 2011
@@ -63,6 +63,7 @@ class QueueEvents
     QPID_BROKER_EXTERN void unregisterListener(const std::string& id);
     void enable();
     void disable();
+    void observe(Queue&, bool enqueueOnly);
     //process all outstanding events
     QPID_BROKER_EXTERN void shutdown();
     QPID_BROKER_EXTERN bool isSync();



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org