You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2011/02/17 15:08:18 UTC
svn commit: r1071615 [1/3] - in /qpid/branches/qpid-2935/qpid: ./ bin/ cpp/
cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2/examples/python/
cpp/bindings/qmf2/examples/ruby/ cpp/bindings/qmf2/python/
cpp/bindings/qmf2/ruby/ cpp/bindings/qpid/ cpp/bin...
Author: kgiusti
Date: Thu Feb 17 14:08:14 2011
New Revision: 1071615
URL: http://svn.apache.org/viewvc?rev=1071615&view=rev
Log:
QPID-2935: sync with latest trunk
Added:
qpid/branches/qpid-2935/qpid/cpp/bindings/qmf2/examples/cpp/print_events.cpp
- copied unchanged from r1071383, qpid/trunk/qpid/cpp/bindings/qmf2/examples/cpp/print_events.cpp
qpid/branches/qpid-2935/qpid/cpp/bindings/qpid/dotnet/Makefile.am
- copied unchanged from r1071383, qpid/trunk/qpid/cpp/bindings/qpid/dotnet/Makefile.am
qpid/branches/qpid-2935/qpid/cpp/build-aux/.gitignore
- copied unchanged from r1071383, qpid/trunk/qpid/cpp/build-aux/.gitignore
qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Fairshare.cpp
- copied unchanged from r1071383, qpid/trunk/qpid/cpp/src/qpid/broker/Fairshare.cpp
qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Fairshare.h
- copied unchanged from r1071383, qpid/trunk/qpid/cpp/src/qpid/broker/Fairshare.h
qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp
- copied unchanged from r1071383, qpid/trunk/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp
qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/LegacyLVQ.h
- copied unchanged from r1071383, qpid/trunk/qpid/cpp/src/qpid/broker/LegacyLVQ.h
qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/MessageDeque.cpp
- copied unchanged from r1071383, qpid/trunk/qpid/cpp/src/qpid/broker/MessageDeque.cpp
qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/MessageDeque.h
- copied unchanged from r1071383, qpid/trunk/qpid/cpp/src/qpid/broker/MessageDeque.h
qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/MessageMap.cpp
- copied unchanged from r1071383, qpid/trunk/qpid/cpp/src/qpid/broker/MessageMap.cpp
qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/MessageMap.h
- copied unchanged from r1071383, qpid/trunk/qpid/cpp/src/qpid/broker/MessageMap.h
qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Messages.h
- copied unchanged from r1071383, qpid/trunk/qpid/cpp/src/qpid/broker/Messages.h
qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/PriorityQueue.cpp
- copied unchanged from r1071383, qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.cpp
qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/PriorityQueue.h
- copied unchanged from r1071383, qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.h
qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueObserver.h
- copied unchanged from r1071383, qpid/trunk/qpid/cpp/src/qpid/broker/QueueObserver.h
qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/ThresholdAlerts.cpp
- copied unchanged from r1071383, qpid/trunk/qpid/cpp/src/qpid/broker/ThresholdAlerts.cpp
qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/ThresholdAlerts.h
- copied unchanged from r1071383, qpid/trunk/qpid/cpp/src/qpid/broker/ThresholdAlerts.h
qpid/branches/qpid-2935/qpid/tests/src/py/qpid_tests/broker_0_10/extensions.py
- copied unchanged from r1071383, qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/extensions.py
qpid/branches/qpid-2935/qpid/tests/src/py/qpid_tests/broker_0_10/lvq.py
- copied unchanged from r1071383, qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/lvq.py
qpid/branches/qpid-2935/qpid/tests/src/py/qpid_tests/broker_0_10/priority.py
- copied unchanged from r1071383, qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/priority.py
qpid/branches/qpid-2935/qpid/tests/src/py/qpid_tests/broker_0_10/threshold.py
- copied unchanged from r1071383, qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/threshold.py
Removed:
qpid/branches/qpid-2935/qpid/java/common/src/main/resources/
qpid/branches/qpid-2935/qpid/java/systests/src/old_test/
qpid/branches/qpid-2935/qpid/java/tools/etc/jndi.properties
Modified:
qpid/branches/qpid-2935/qpid/ (props changed)
qpid/branches/qpid-2935/qpid/bin/release.sh
qpid/branches/qpid-2935/qpid/cpp/INSTALL
qpid/branches/qpid-2935/qpid/cpp/bindings/qmf2/examples/cpp/Makefile.am
qpid/branches/qpid-2935/qpid/cpp/bindings/qmf2/examples/python/agent.py
qpid/branches/qpid-2935/qpid/cpp/bindings/qmf2/examples/ruby/find_agents.rb
qpid/branches/qpid-2935/qpid/cpp/bindings/qmf2/python/qmf2.py
qpid/branches/qpid-2935/qpid/cpp/bindings/qmf2/ruby/qmf2.rb
qpid/branches/qpid-2935/qpid/cpp/bindings/qpid/Makefile.am
qpid/branches/qpid-2935/qpid/cpp/configure.ac
qpid/branches/qpid-2935/qpid/cpp/examples/tradedemo/tradedemo_topic_publisher.vcproj (props changed)
qpid/branches/qpid-2935/qpid/cpp/include/qmf/engine/Agent.h (props changed)
qpid/branches/qpid-2935/qpid/cpp/include/qmf/engine/Console.h (props changed)
qpid/branches/qpid-2935/qpid/cpp/src/CMakeLists.txt (contents, props changed)
qpid/branches/qpid-2935/qpid/cpp/src/Makefile.am
qpid/branches/qpid-2935/qpid/cpp/src/qmf/Agent.cpp
qpid/branches/qpid-2935/qpid/cpp/src/qmf/AgentSession.cpp
qpid/branches/qpid-2935/qpid/cpp/src/qmf/ConsoleSession.cpp
qpid/branches/qpid-2935/qpid/cpp/src/qmf/ConsoleSessionImpl.h
qpid/branches/qpid-2935/qpid/cpp/src/qmf/engine/Agent.cpp (props changed)
qpid/branches/qpid-2935/qpid/cpp/src/qpid/RefCountedBuffer.h
qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.cpp
qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.h
qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Message.cpp
qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Message.h
qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.cpp
qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.h
qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueEvents.cpp
qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueEvents.h
qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueRegistry.h
qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionHandler.cpp
qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.cpp
qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.h
qpid/branches/qpid-2935/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
qpid/branches/qpid-2935/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
qpid/branches/qpid-2935/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h
qpid/branches/qpid-2935/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp
qpid/branches/qpid-2935/qpid/cpp/src/qpid/cluster/Connection.cpp
qpid/branches/qpid-2935/qpid/cpp/src/qpid/cluster/Connection.h
qpid/branches/qpid-2935/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (contents, props changed)
qpid/branches/qpid-2935/qpid/cpp/src/qpid/cluster/UpdateClient.h (props changed)
qpid/branches/qpid-2935/qpid/cpp/src/qpid/management/ManagementAgent.cpp (props changed)
qpid/branches/qpid-2935/qpid/cpp/src/qpid/management/ManagementAgent.h (props changed)
qpid/branches/qpid-2935/qpid/cpp/src/tests/CMakeLists.txt
qpid/branches/qpid-2935/qpid/cpp/src/tests/MessagingSessionTests.cpp
qpid/branches/qpid-2935/qpid/cpp/src/tests/cluster.mk
qpid/branches/qpid-2935/qpid/cpp/src/tests/cluster_test.cpp
qpid/branches/qpid-2935/qpid/cpp/src/tests/cluster_tests.py (props changed)
qpid/branches/qpid-2935/qpid/cpp/src/tests/qpid-cpp-benchmark
qpid/branches/qpid-2935/qpid/cpp/src/tests/qpid-receive.cpp
qpid/branches/qpid-2935/qpid/cpp/src/tests/qpid-send.cpp
qpid/branches/qpid-2935/qpid/cpp/xml/cluster.xml
qpid/branches/qpid-2935/qpid/dotnet/build-msbuild.bat (props changed)
qpid/branches/qpid-2935/qpid/dotnet/build-nant-release (props changed)
qpid/branches/qpid-2935/qpid/dotnet/build-nant.bat (props changed)
qpid/branches/qpid-2935/qpid/extras/qmf/src/py/qmf/console.py
qpid/branches/qpid-2935/qpid/extras/sasl/m4/ac_pkg_swig.m4
qpid/branches/qpid-2935/qpid/java/ (props changed)
qpid/branches/qpid-2935/qpid/java/broker/ (props changed)
qpid/branches/qpid-2935/qpid/java/broker/bin/ (props changed)
qpid/branches/qpid-2935/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ (props changed)
qpid/branches/qpid-2935/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ (props changed)
qpid/branches/qpid-2935/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ (props changed)
qpid/branches/qpid-2935/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
qpid/branches/qpid-2935/qpid/java/common.xml
qpid/branches/qpid-2935/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
qpid/branches/qpid-2935/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
qpid/branches/qpid-2935/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
qpid/branches/qpid-2935/qpid/java/management/client/src/main/java/org/apache/qpid/management/ (props changed)
qpid/branches/qpid-2935/qpid/java/management/client/src/test/java/org/apache/qpid/management/ (props changed)
qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ConfigurationManagement.java (props changed)
qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/LoggingManagement.java (props changed)
qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java (props changed)
qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java (props changed)
qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java (props changed)
qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java (props changed)
qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/UserManagement.java (props changed)
qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanAttribute.java (props changed)
qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanConstructor.java (props changed)
qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanDescription.java (props changed)
qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperation.java (props changed)
qpid/branches/qpid-2935/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperationParameter.java (props changed)
qpid/branches/qpid-2935/qpid/java/management/eclipse-plugin/src/ (props changed)
qpid/branches/qpid-2935/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ConnectionTypeTabControl.java (props changed)
qpid/branches/qpid-2935/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ExchangeTypeTabControl.java (props changed)
qpid/branches/qpid-2935/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/MBeanTypeTabControl.java (props changed)
qpid/branches/qpid-2935/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/QueueTypeTabControl.java (props changed)
qpid/branches/qpid-2935/qpid/java/management/eclipse-plugin/src/main/resources/macosx/Contents/MacOS/qpidmc (props changed)
qpid/branches/qpid-2935/qpid/java/module.xml
qpid/branches/qpid-2935/qpid/java/systests/src/main/java/org/apache/qpid/client/MultipleJCAProviderRegistrationTest.java (props changed)
qpid/branches/qpid-2935/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java (props changed)
qpid/branches/qpid-2935/qpid/java/test-profiles/ (props changed)
qpid/branches/qpid-2935/qpid/java/test-profiles/08StandaloneExcludes (props changed)
qpid/branches/qpid-2935/qpid/java/test-profiles/CPPExcludes (props changed)
qpid/branches/qpid-2935/qpid/java/test-profiles/Excludes (props changed)
qpid/branches/qpid-2935/qpid/java/test-profiles/JavaExcludes (props changed)
qpid/branches/qpid-2935/qpid/java/test-profiles/JavaStandaloneExcludes (props changed)
qpid/branches/qpid-2935/qpid/java/test-profiles/JavaTransientExcludes (props changed)
qpid/branches/qpid-2935/qpid/java/test-profiles/XAExcludes (props changed)
qpid/branches/qpid-2935/qpid/java/test-profiles/clean-dir (props changed)
qpid/branches/qpid-2935/qpid/java/test-profiles/cpp.async.testprofile (props changed)
qpid/branches/qpid-2935/qpid/java/test-profiles/cpp.cluster.testprofile (props changed)
qpid/branches/qpid-2935/qpid/java/test-profiles/cpp.noprefetch.testprofile (props changed)
qpid/branches/qpid-2935/qpid/java/test-profiles/cpp.ssl.excludes (props changed)
qpid/branches/qpid-2935/qpid/java/test-profiles/cpp.ssl.testprofile (props changed)
qpid/branches/qpid-2935/qpid/java/test-profiles/cpp.testprofile (props changed)
qpid/branches/qpid-2935/qpid/java/test-profiles/default.testprofile (props changed)
qpid/branches/qpid-2935/qpid/java/test-profiles/java-derby.testprofile (props changed)
qpid/branches/qpid-2935/qpid/java/test-profiles/java.testprofile (props changed)
qpid/branches/qpid-2935/qpid/java/test-profiles/log4j-test.xml (props changed)
qpid/branches/qpid-2935/qpid/java/test-profiles/test-provider.properties (props changed)
qpid/branches/qpid-2935/qpid/java/test-profiles/test_resources/ (props changed)
qpid/branches/qpid-2935/qpid/java/tools/bin/perf_report.sh
qpid/branches/qpid-2935/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java
qpid/branches/qpid-2935/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java
qpid/branches/qpid-2935/qpid/packaging/windows/ (props changed)
qpid/branches/qpid-2935/qpid/python/ (props changed)
qpid/branches/qpid-2935/qpid/python/examples/api/spout (props changed)
qpid/branches/qpid-2935/qpid/python/qpid/concurrency.py (props changed)
qpid/branches/qpid-2935/qpid/ruby/ext/sasl/extconf.rb (props changed)
qpid/branches/qpid-2935/qpid/specs/management-schema.xml
qpid/branches/qpid-2935/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py
qpid/branches/qpid-2935/qpid/tests/src/py/qpid_tests/broker_0_9/queue.py (props changed)
qpid/branches/qpid-2935/qpid/tools/src/py/.gitignore
Propchange: qpid/branches/qpid-2935/qpid/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Feb 17 14:08:14 2011
@@ -2,4 +2,4 @@
/qpid/branches/0.6-release-windows-installer:926803
/qpid/branches/0.6-release-windows-installer/qpid:926803,927233
/qpid/branches/java-network-refactor/qpid:805429-825319
-/qpid/trunk/qpid:1061302-1068442
+/qpid/trunk/qpid:1061302-1071383
Modified: qpid/branches/qpid-2935/qpid/bin/release.sh
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/bin/release.sh?rev=1071615&r1=1071614&r2=1071615&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/bin/release.sh (original)
+++ qpid/branches/qpid-2935/qpid/bin/release.sh Thu Feb 17 14:08:14 2011
@@ -208,7 +208,7 @@ fi
if [ "JAVA" == "$JAVA" ] ; then
pushd qpid-${VER}/java
- ant build release release-bin release-mvn -Dsvnversion.output=${REV}
+ ant build release release-bin release-mvn -Dsvnversion.output=${REV} -Dmaven.snapshot=false
popd
cp qpid-${VER}/java/release/*.tar.gz artifacts/qpid-java-${VER}.tar.gz
Modified: qpid/branches/qpid-2935/qpid/cpp/INSTALL
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/INSTALL?rev=1071615&r1=1071614&r2=1071615&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/INSTALL (original)
+++ qpid/branches/qpid-2935/qpid/cpp/INSTALL Thu Feb 17 14:08:14 2011
@@ -67,6 +67,10 @@ Optional SSL support requires:
* nss <http://www.mozilla.org/projects/security/pki/nss/>
* nspr <http://www.mozilla.org/projects/nspr/>
+Optional binding support for ruby requires:
+* ruby and ruby devel <http://www.ruby-lang.org/en/>
+* swig <http://www.swig.org/>
+
Qpid has been built using the GNU C++ compiler:
* gcc <http://gcc.gnu.org/> (3.4.6)
@@ -124,6 +128,9 @@ For the XML Exchange, include:
# yum install xqilla-devel xerces-c-devel
+Optional ruby binding support include:
+ # yum install ruby ruby-devel swig
+
Follow the manual installation instruction below for any packages not
available through your distributions packaging tool.
Modified: qpid/branches/qpid-2935/qpid/cpp/bindings/qmf2/examples/cpp/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/bindings/qmf2/examples/cpp/Makefile.am?rev=1071615&r1=1071614&r2=1071615&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/bindings/qmf2/examples/cpp/Makefile.am (original)
+++ qpid/branches/qpid-2935/qpid/cpp/bindings/qmf2/examples/cpp/Makefile.am Thu Feb 17 14:08:14 2011
@@ -21,7 +21,7 @@ INCLUDE = -I$(top_srcdir)/include
AM_CPPFLAGS = $(INCLUDE)
-noinst_PROGRAMS=agent list_agents
+noinst_PROGRAMS=agent list_agents print_events
agent_SOURCES=agent.cpp
agent_LDADD=$(top_builddir)/src/libqmf2.la
@@ -29,3 +29,5 @@ agent_LDADD=$(top_builddir)/src/libqmf2.
list_agents_SOURCES=list_agents.cpp
list_agents_LDADD=$(top_builddir)/src/libqmf2.la
+print_events_SOURCES=print_events.cpp
+print_events_LDADD=$(top_builddir)/src/libqmf2.la
Modified: qpid/branches/qpid-2935/qpid/cpp/bindings/qmf2/examples/python/agent.py
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/bindings/qmf2/examples/python/agent.py?rev=1071615&r1=1071614&r2=1071615&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/bindings/qmf2/examples/python/agent.py (original)
+++ qpid/branches/qpid-2935/qpid/cpp/bindings/qmf2/examples/python/agent.py Thu Feb 17 14:08:14 2011
@@ -69,32 +69,41 @@ class ExampleAgent(AgentHandler):
if addr == self.controlAddr:
self.control.methodCount += 1
- if methodName == "stop":
- self.session.methodSuccess(handle)
- self.cancel()
-
- elif methodName == "echo":
- handle.addReturnArgument("sequence", args["sequence"])
- handle.addReturnArgument("map", args["map"])
- self.session.methodSuccess(handle)
-
- elif methodName == "fail":
- if args['useString']:
- self.session.raiseException(handle, args['stringVal'])
- else:
- ex = Data(self.sch_exception)
- ex.whatHappened = "It Failed"
- ex.howBad = 75
- ex.details = args['details']
- self.session.raiseException(handle, ex)
-
- elif methodName == "create_child":
- name = args['name']
- child = Data(self.sch_child)
- child.name = name
- addr = self.session.addData(child, name)
- handle.addReturnArgument("childAddr", addr.asMap())
- self.session.methodSuccess(handle)
+ try:
+ if methodName == "stop":
+ self.session.methodSuccess(handle)
+ self.cancel()
+
+ elif methodName == "echo":
+ handle.addReturnArgument("sequence", args["sequence"])
+ handle.addReturnArgument("map", args["map"])
+ self.session.methodSuccess(handle)
+
+ elif methodName == "event":
+ ev = Data(self.sch_event)
+ ev.text = args['text']
+ self.session.raiseEvent(ev, args['severity'])
+ self.session.methodSuccess(handle)
+
+ elif methodName == "fail":
+ if args['useString']:
+ self.session.raiseException(handle, args['stringVal'])
+ else:
+ ex = Data(self.sch_exception)
+ ex.whatHappened = "It Failed"
+ ex.howBad = 75
+ ex.details = args['details']
+ self.session.raiseException(handle, ex)
+
+ elif methodName == "create_child":
+ name = args['name']
+ child = Data(self.sch_child)
+ child.name = name
+ addr = self.session.addData(child, name)
+ handle.addReturnArgument("childAddr", addr.asMap())
+ self.session.methodSuccess(handle)
+ except BaseException, e:
+ self.session.raiseException(handle, "%r" % e)
def setupSchema(self):
@@ -128,6 +137,11 @@ class ExampleAgent(AgentHandler):
echoMethod.addArgument(SchemaProperty("map", SCHEMA_DATA_MAP, direction=DIR_IN_OUT))
self.sch_control.addMethod(echoMethod)
+ eventMethod = SchemaMethod("event", desc="Raise an Event")
+ eventMethod.addArgument(SchemaProperty("text", SCHEMA_DATA_STRING, direction=DIR_IN))
+ eventMethod.addArgument(SchemaProperty("severity", SCHEMA_DATA_INT, direction=DIR_IN))
+ self.sch_control.addMethod(eventMethod)
+
failMethod = SchemaMethod("fail", desc="Expected to Fail")
failMethod.addArgument(SchemaProperty("useString", SCHEMA_DATA_BOOL, direction=DIR_IN))
failMethod.addArgument(SchemaProperty("stringVal", SCHEMA_DATA_STRING, direction=DIR_IN))
@@ -146,11 +160,18 @@ class ExampleAgent(AgentHandler):
self.sch_child.addProperty(SchemaProperty("name", SCHEMA_DATA_STRING))
##
+ ## Declare the event class
+ ##
+ self.sch_event = Schema(SCHEMA_TYPE_EVENT, package, "event")
+ self.sch_event.addProperty(SchemaProperty("text", SCHEMA_DATA_STRING))
+
+ ##
## Register our schemata with the agent session.
##
self.session.registerSchema(self.sch_exception)
self.session.registerSchema(self.sch_control)
self.session.registerSchema(self.sch_child)
+ self.session.registerSchema(self.sch_event)
def populateData(self):
Modified: qpid/branches/qpid-2935/qpid/cpp/bindings/qmf2/examples/ruby/find_agents.rb
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/bindings/qmf2/examples/ruby/find_agents.rb?rev=1071615&r1=1071614&r2=1071615&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/bindings/qmf2/examples/ruby/find_agents.rb (original)
+++ qpid/branches/qpid-2935/qpid/cpp/bindings/qmf2/examples/ruby/find_agents.rb Thu Feb 17 14:08:14 2011
@@ -27,7 +27,7 @@ class FindAgents < Qmf2::ConsoleHandler
end
def agent_added(agent)
- puts "Agent Added: #{agent.to_s}"
+ puts "Agent Added: #{agent.name}"
end
def agent_deleted(agent, reason)
Modified: qpid/branches/qpid-2935/qpid/cpp/bindings/qmf2/python/qmf2.py
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/bindings/qmf2/python/qmf2.py?rev=1071615&r1=1071614&r2=1071615&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/bindings/qmf2/python/qmf2.py (original)
+++ qpid/branches/qpid-2935/qpid/cpp/bindings/qmf2/python/qmf2.py Thu Feb 17 14:08:14 2011
@@ -165,7 +165,7 @@ class ConsoleHandler(Thread):
reason = 'filter'
if event.getAgentDelReason() == cqmf2.AGENT_DEL_AGED:
reason = 'aged'
- self.agentDeleted(Agent(event.getAgent(), reason))
+ self.agentDeleted(Agent(event.getAgent()), reason)
elif event.getType() == cqmf2.CONSOLE_AGENT_RESTART:
self.agentRestarted(Agent(event.getAgent()))
@@ -373,6 +373,16 @@ class AgentSession(object):
else:
self._impl.raiseException(handle, data)
+ def raiseEvent(self, data, severity=None):
+ """
+ """
+ if not severity:
+ self._impl.raiseEvent(data._impl)
+ else:
+ if (severity.__class__ != int and severity.__class__ != long) or severity < 0 or severity > 7:
+ raise Exception("Severity must be an int between 0..7")
+ self._impl.raiseEvent(data._impl, severity);
+
#===================================================================================================
# AGENT PROXY
Modified: qpid/branches/qpid-2935/qpid/cpp/bindings/qmf2/ruby/qmf2.rb
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/bindings/qmf2/ruby/qmf2.rb?rev=1071615&r1=1071614&r2=1071615&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/bindings/qmf2/ruby/qmf2.rb (original)
+++ qpid/branches/qpid-2935/qpid/cpp/bindings/qmf2/ruby/qmf2.rb Thu Feb 17 14:08:14 2011
@@ -433,6 +433,14 @@ module Qmf2
def del_data(addr)
@impl.del_data(addr.impl)
end
+
+ def raise_event(data, severity=nil)
+ if !severity
+ @impl.raiseEvent(data.impl)
+ else
+ @impl.raiseEvent(data.impl, severity)
+ end
+ end
end
##==============================================================================
Modified: qpid/branches/qpid-2935/qpid/cpp/bindings/qpid/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/bindings/qpid/Makefile.am?rev=1071615&r1=1071614&r2=1071615&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/bindings/qpid/Makefile.am (original)
+++ qpid/branches/qpid-2935/qpid/cpp/bindings/qpid/Makefile.am Thu Feb 17 14:08:14 2011
@@ -17,10 +17,11 @@
# under the License.
#
+SUBDIRS = dotnet
+
if HAVE_SWIG
EXTRA_DIST = qpid.i
-SUBDIRS =
if HAVE_RUBY_DEVEL
SUBDIRS += ruby
Modified: qpid/branches/qpid-2935/qpid/cpp/configure.ac
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/configure.ac?rev=1071615&r1=1071614&r2=1071615&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/configure.ac (original)
+++ qpid/branches/qpid-2935/qpid/cpp/configure.ac Thu Feb 17 14:08:14 2011
@@ -16,7 +16,7 @@ AC_INIT([qpidc],
[dev@qpid.apache.org])
AC_CONFIG_AUX_DIR([build-aux])
-AM_INIT_AUTOMAKE([dist-bzip2 subdir-objects])
+AM_INIT_AUTOMAKE([dist-bzip2 subdir-objects tar-ustar])
# Minimum Autoconf version required.
AC_PREREQ(2.59)
@@ -538,6 +538,7 @@ AC_CONFIG_FILES([
bindings/qpid/ruby/Makefile
bindings/qpid/python/Makefile
bindings/qpid/perl/Makefile
+ bindings/qpid/dotnet/Makefile
bindings/qmf/Makefile
bindings/qmf/ruby/Makefile
bindings/qmf/python/Makefile
Propchange: qpid/branches/qpid-2935/qpid/cpp/examples/tradedemo/tradedemo_topic_publisher.vcproj
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Feb 17 14:08:14 2011
@@ -2,4 +2,4 @@
/qpid/branches/0.6-release-windows-installer/cpp/examples/tradedemo/tradedemo_topic_publisher.vcproj:926803
/qpid/branches/0.6-release-windows-installer/qpid/cpp/examples/tradedemo/tradedemo_topic_publisher.vcproj:926803,927218,927233
/qpid/branches/java-network-refactor/qpid/cpp/examples/tradedemo/tradedemo_topic_publisher.vcproj:805429-825319
-/qpid/trunk/qpid/cpp/examples/tradedemo/tradedemo_topic_publisher.vcproj:1061302-1068442
+/qpid/trunk/qpid/cpp/examples/tradedemo/tradedemo_topic_publisher.vcproj:1061302-1071383
Propchange: qpid/branches/qpid-2935/qpid/cpp/include/qmf/engine/Agent.h
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Feb 17 14:08:14 2011
@@ -1 +1 @@
-/qpid/trunk/qpid/cpp/include/qmf/engine/Agent.h:1061302-1068442
+/qpid/trunk/qpid/cpp/include/qmf/engine/Agent.h:1061302-1071383
Propchange: qpid/branches/qpid-2935/qpid/cpp/include/qmf/engine/Console.h
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Feb 17 14:08:14 2011
@@ -1 +1 @@
-/qpid/trunk/qpid/cpp/include/qmf/engine/Console.h:1061302-1068442
+/qpid/trunk/qpid/cpp/include/qmf/engine/Console.h:1061302-1071383
Modified: qpid/branches/qpid-2935/qpid/cpp/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/CMakeLists.txt?rev=1071615&r1=1071614&r2=1071615&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/CMakeLists.txt (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/CMakeLists.txt Thu Feb 17 14:08:14 2011
@@ -313,6 +313,10 @@ if (NOT Boost_FILESYSTEM_LIBRARY)
set(Boost_FILESYSTEM_LIBRARY boost_filesystem)
endif (NOT Boost_FILESYSTEM_LIBRARY)
+if (NOT Boost_SYSTEM_LIBRARY)
+ set(Boost_SYSTEM_LIBRARY boost_system)
+endif (NOT Boost_SYSTEM_LIBRARY)
+
if (NOT Boost_UNIT_TEST_FRAMEWORK_LIBRARY)
set(Boost_UNIT_TEST_FRAMEWORK_LIBRARY boost_unit_test_framework)
endif (NOT Boost_UNIT_TEST_FRAMEWORK_LIBRARY)
@@ -487,6 +491,7 @@ endif (BUILD_SASL)
CHECK_LIBRARY_EXISTS (xerces-c _init "" HAVE_XERCES)
CHECK_INCLUDE_FILE_CXX (xercesc/framework/MemBufInputSource.hpp HAVE_XERCES_H)
CHECK_INCLUDE_FILE_CXX (xqilla/xqilla-simple.hpp HAVE_XQILLA_H)
+CHECK_INCLUDE_FILE_CXX (xqilla/ast/XQEffectiveBooleanValue.hpp HAVE_XQ_EBV)
set (xml_default ${xml_force})
if (CMAKE_SYSTEM_NAME STREQUAL Windows)
@@ -510,6 +515,10 @@ if (BUILD_XML)
message(FATAL_ERROR "XML Exchange support requested but XQilla headers not found")
endif (NOT HAVE_XQILLA_H)
+ if (HAVE_XQ_EBV)
+ add_definitions(-DXQ_EFFECTIVE_BOOLEAN_VALUE_HPP)
+ endif (HAVE_XQ_EBV)
+
add_library (xml MODULE
qpid/xml/XmlExchange.cpp
qpid/xml/XmlExchange.h
@@ -956,6 +965,11 @@ set (qpidbroker_SOURCES
qpid/broker/Broker.cpp
qpid/broker/Exchange.cpp
qpid/broker/ExpiryPolicy.cpp
+ qpid/broker/Fairshare.cpp
+ qpid/broker/LegacyLVQ.cpp
+ qpid/broker/MessageDeque.cpp
+ qpid/broker/MessageMap.cpp
+ qpid/broker/PriorityQueue.cpp
qpid/broker/Queue.cpp
qpid/broker/QueueCleaner.cpp
qpid/broker/QueueListeners.cpp
@@ -1006,6 +1020,7 @@ set (qpidbroker_SOURCES
qpid/broker/SessionHandler.h
qpid/broker/SessionHandler.cpp
qpid/broker/System.cpp
+ qpid/broker/ThresholdAlerts.cpp
qpid/broker/TopicExchange.cpp
qpid/broker/TxAccept.cpp
qpid/broker/TxBuffer.cpp
@@ -1207,6 +1222,8 @@ install (TARGETS replication_exchange
# file whereas older builds only have config.h on autoconf-generated builds.
add_definitions(-DHAVE_CONFIG_H)
+add_definitions(-DBOOST_FILESYSTEM_VERSION=2)
+
# Now create the config file from all the info learned above.
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/config.h.cmake
${CMAKE_CURRENT_BINARY_DIR}/config.h)
Propchange: qpid/branches/qpid-2935/qpid/cpp/src/CMakeLists.txt
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Feb 17 14:08:14 2011
@@ -2,4 +2,4 @@
/qpid/branches/0.6-release-windows-installer/cpp/src/CMakeLists.txt:926803
/qpid/branches/0.6-release-windows-installer/qpid/cpp/src/CMakeLists.txt:926803,927233,932132
/qpid/branches/java-network-refactor/qpid/cpp/src/CMakeLists.txt:805429-825319
-/qpid/trunk/qpid/cpp/src/CMakeLists.txt:1061302-1068442
+/qpid/trunk/qpid/cpp/src/CMakeLists.txt:1061302-1071383
Modified: qpid/branches/qpid-2935/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/Makefile.am?rev=1071615&r1=1071614&r2=1071615&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/Makefile.am (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/Makefile.am Thu Feb 17 14:08:14 2011
@@ -24,6 +24,7 @@ SUBDIRS = . tests
# using Visual Studio solutions/projects.
windows_dist = \
qpid/client/windows/SaslFactory.cpp \
+ qpid/client/windows/SslConnector.cpp \
qpid/log/windows/SinkOptions.cpp \
qpid/log/windows/SinkOptions.h \
../include/qpid/sys/windows/check.h \
@@ -42,6 +43,8 @@ windows_dist = \
qpid/sys/windows/Shlib.cpp \
qpid/sys/windows/SocketAddress.cpp \
qpid/sys/windows/Socket.cpp \
+ qpid/sys/windows/SslAsynchIO.cpp \
+ qpid/sys/windows/SslAsynchIO.h \
qpid/sys/windows/StrError.cpp \
qpid/sys/windows/SystemInfo.cpp \
qpid/sys/windows/Thread.cpp \
@@ -51,7 +54,9 @@ windows_dist = \
qpid/sys/windows/uuid.h \
windows/QpiddBroker.cpp \
qpid/broker/windows/BrokerDefaults.cpp \
- qpid/broker/windows/SaslAuthenticator.cpp
+ qpid/broker/windows/SaslAuthenticator.cpp \
+ qpid/broker/windows/SslProtocolFactory.cpp \
+ qpid/messaging/HandleInstantiator.cpp
EXTRA_DIST= $(platform_dist) $(rgen_srcs) $(windows_dist)
@@ -122,6 +127,8 @@ qpidtest_SCRIPTS =
tmoduledir = $(libdir)/qpid/tests
tmodule_LTLIBRARIES=
+AM_CXXFLAGS += -DBOOST_FILESYSTEM_VERSION=2
+
## Automake macros to build libraries and executables.
qpidd_CXXFLAGS = $(AM_CXXFLAGS) -DQPIDD_MODULE_DIR=\"$(dmoduledir)\" -DQPIDD_CONF_FILE=\"$(sysconfdir)/qpidd.conf\"
libqpidclient_la_CXXFLAGS = $(AM_CXXFLAGS) -DQPIDC_MODULE_DIR=\"$(cmoduledir)\" -DQPIDC_CONF_FILE=\"$(confdir)/qpidc.conf\"
@@ -543,6 +550,8 @@ libqpidbroker_la_SOURCES = \
qpid/broker/ExchangeRegistry.h \
qpid/broker/ExpiryPolicy.cpp \
qpid/broker/ExpiryPolicy.h \
+ qpid/broker/Fairshare.h \
+ qpid/broker/Fairshare.cpp \
qpid/broker/FanOutExchange.cpp \
qpid/broker/FanOutExchange.h \
qpid/broker/FedOps.h \
@@ -550,6 +559,8 @@ libqpidbroker_la_SOURCES = \
qpid/broker/HeadersExchange.cpp \
qpid/broker/HeadersExchange.h \
qpid/broker/AsyncCompletion.h \
+ qpid/broker/LegacyLVQ.h \
+ qpid/broker/LegacyLVQ.cpp \
qpid/broker/Link.cpp \
qpid/broker/Link.h \
qpid/broker/LinkRegistry.cpp \
@@ -560,9 +571,16 @@ libqpidbroker_la_SOURCES = \
qpid/broker/MessageAdapter.h \
qpid/broker/MessageBuilder.cpp \
qpid/broker/MessageBuilder.h \
+ qpid/broker/MessageDeque.h \
+ qpid/broker/MessageDeque.cpp \
+ qpid/broker/MessageMap.h \
+ qpid/broker/MessageMap.cpp \
+ qpid/broker/Messages.h \
qpid/broker/MessageStore.h \
qpid/broker/MessageStoreModule.cpp \
qpid/broker/MessageStoreModule.h \
+ qpid/broker/PriorityQueue.h \
+ qpid/broker/PriorityQueue.cpp \
qpid/broker/NameGenerator.cpp \
qpid/broker/NameGenerator.h \
qpid/broker/NullMessageStore.cpp \
@@ -584,6 +602,7 @@ libqpidbroker_la_SOURCES = \
qpid/broker/QueueEvents.h \
qpid/broker/QueueListeners.cpp \
qpid/broker/QueueListeners.h \
+ qpid/broker/QueueObserver.h \
qpid/broker/QueuePolicy.cpp \
qpid/broker/QueuePolicy.h \
qpid/broker/QueueRegistry.cpp \
@@ -632,6 +651,8 @@ libqpidbroker_la_SOURCES = \
qpid/broker/SignalHandler.h \
qpid/broker/System.cpp \
qpid/broker/System.h \
+ qpid/broker/ThresholdAlerts.cpp \
+ qpid/broker/ThresholdAlerts.h \
qpid/broker/TopicExchange.cpp \
qpid/broker/TopicExchange.h \
qpid/broker/TransactionalStore.h \
Modified: qpid/branches/qpid-2935/qpid/cpp/src/qmf/Agent.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qmf/Agent.cpp?rev=1071615&r1=1071614&r2=1071615&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qmf/Agent.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qmf/Agent.cpp Thu Feb 17 14:08:14 2011
@@ -339,7 +339,7 @@ void AgentImpl::handleMethodResponse(con
uint32_t correlator;
boost::shared_ptr<SyncContext> context;
- QPID_LOG(trace, "RCVD MethodResponse map=" << response);
+ QPID_LOG(trace, "RCVD MethodResponse cid=" << cid << " map=" << response);
aIter = response.find("_arguments");
if (aIter != response.end())
@@ -556,13 +556,14 @@ void AgentImpl::sendQuery(const Query& q
msg.setReplyTo(session.replyAddress);
msg.setCorrelationId(boost::lexical_cast<string>(correlator));
msg.setSubject(directSubject);
- if (!session.authUser.empty())
- msg.setUserId(session.authUser);
+ string userId(session.connection.getAuthenticatedUsername());
+ if (!userId.empty())
+ msg.setUserId(userId);
encode(QueryImplAccess::get(query).asMap(), msg);
- if (sender.isValid())
+ if (sender.isValid()) {
sender.send(msg);
-
- QPID_LOG(trace, "SENT QueryRequest to=" << name);
+ QPID_LOG(trace, "SENT QueryRequest to=" << sender.getName() << "/" << directSubject << " cid=" << correlator);
+ }
}
@@ -583,13 +584,14 @@ void AgentImpl::sendMethod(const string&
msg.setReplyTo(session.replyAddress);
msg.setCorrelationId(boost::lexical_cast<string>(correlator));
msg.setSubject(directSubject);
- if (!session.authUser.empty())
- msg.setUserId(session.authUser);
+ string userId(session.connection.getAuthenticatedUsername());
+ if (!userId.empty())
+ msg.setUserId(userId);
encode(map, msg);
- if (sender.isValid())
+ if (sender.isValid()) {
sender.send(msg);
-
- QPID_LOG(trace, "SENT MethodRequest method=" << method << " to=" << name);
+ QPID_LOG(trace, "SENT MethodRequest method=" << method << " to=" << sender.getName() << "/" << directSubject << " content=" << map << " cid=" << correlator);
+ }
}
void AgentImpl::sendSchemaRequest(const SchemaId& id)
@@ -626,12 +628,13 @@ void AgentImpl::sendSchemaRequest(const
msg.setReplyTo(session.replyAddress);
msg.setContent(content);
msg.setSubject(directSubject);
- if (!session.authUser.empty())
- msg.setUserId(session.authUser);
- if (sender.isValid())
+ string userId(session.connection.getAuthenticatedUsername());
+ if (!userId.empty())
+ msg.setUserId(userId);
+ if (sender.isValid()) {
sender.send(msg);
-
- QPID_LOG(trace, "SENT V1SchemaRequest to=" << name);
+ QPID_LOG(trace, "SENT V1SchemaRequest to=" << sender.getName() << "/" << directSubject);
+ }
}
Modified: qpid/branches/qpid-2935/qpid/cpp/src/qmf/AgentSession.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qmf/AgentSession.cpp?rev=1071615&r1=1071614&r2=1071615&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qmf/AgentSession.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qmf/AgentSession.cpp Thu Feb 17 14:08:14 2011
@@ -571,7 +571,7 @@ void AgentSessionImpl::raiseEvent(const
encode(list, msg);
topicSender.send(msg);
- QPID_LOG(trace, "SENT EventIndication to=" << subject);
+ QPID_LOG(trace, "SENT EventIndication to=" << topicSender.getName() << "/" << subject);
}
@@ -625,7 +625,7 @@ void AgentSessionImpl::setAgentName()
void AgentSessionImpl::handleLocateRequest(const Variant::List& predicate, const Message& msg)
{
- QPID_LOG(trace, "RCVD AgentLocateRequest");
+ QPID_LOG(trace, "RCVD AgentLocateRequest from=" << msg.getReplyTo());
if (!predicate.empty()) {
Query agentQuery(QUERY_OBJECT);
@@ -659,7 +659,7 @@ void AgentSessionImpl::handleLocateReque
void AgentSessionImpl::handleMethodRequest(const Variant::Map& content, const Message& msg)
{
- QPID_LOG(trace, "RCVD MethodRequest map=" << content << " from=" << msg.getReplyTo());
+ QPID_LOG(trace, "RCVD MethodRequest map=" << content << " from=" << msg.getReplyTo() << " cid=" << msg.getCorrelationId());
//
// Construct an AgentEvent to be sent to the application.
@@ -719,7 +719,7 @@ void AgentSessionImpl::handleMethodReque
void AgentSessionImpl::handleQueryRequest(const Variant::Map& content, const Message& msg)
{
- QPID_LOG(trace, "RCVD QueryRequest query=" << content << " from=" << msg.getReplyTo());
+ QPID_LOG(trace, "RCVD QueryRequest query=" << content << " from=" << msg.getReplyTo() << " cid=" << msg.getCorrelationId());
//
// Construct an AgentEvent to be sent to the application or directly handled by the agent.
Modified: qpid/branches/qpid-2935/qpid/cpp/src/qmf/ConsoleSession.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qmf/ConsoleSession.cpp?rev=1071615&r1=1071614&r2=1071615&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qmf/ConsoleSession.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qmf/ConsoleSession.cpp Thu Feb 17 14:08:14 2011
@@ -65,7 +65,7 @@ Subscription ConsoleSession::subscribe(c
//========================================================================================
ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) :
- connection(c), domain("default"), authUser(c.getAuthenticatedUsername()), maxAgentAgeMinutes(5),
+ connection(c), domain("default"), maxAgentAgeMinutes(5),
opened(false), thread(0), threadCanceled(false), lastVisit(0), lastAgePass(0),
connectedBrokerInAgentList(false), schemaCache(new SchemaCache())
{
@@ -394,6 +394,7 @@ void ConsoleSessionImpl::sendAgentLocate
{
Message msg;
Variant::Map& headers(msg.getProperties());
+ static const string subject("console.request.agent_locate");
headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_REQUEST;
headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_AGENT_LOCATE_REQUEST;
@@ -401,12 +402,12 @@ void ConsoleSessionImpl::sendAgentLocate
msg.setReplyTo(replyAddress);
msg.setCorrelationId("agent-locate");
- msg.setSubject("console.request.agent_locate");
+ msg.setSubject(subject);
encode(agentQuery.getPredicate(), msg);
topicSender.send(msg);
- QPID_LOG(trace, "SENT AgentLocate to topic");
+ QPID_LOG(trace, "SENT AgentLocate to=" << topicSender.getName() << "/" << subject);
}
Modified: qpid/branches/qpid-2935/qpid/cpp/src/qmf/ConsoleSessionImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qmf/ConsoleSessionImpl.h?rev=1071615&r1=1071614&r2=1071615&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qmf/ConsoleSessionImpl.h (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qmf/ConsoleSessionImpl.h Thu Feb 17 14:08:14 2011
@@ -72,7 +72,6 @@ namespace qmf {
qpid::messaging::Sender directSender;
qpid::messaging::Sender topicSender;
std::string domain;
- std::string authUser;
uint32_t maxAgentAgeMinutes;
bool listenOnDirect;
bool strictSecurity;
Propchange: qpid/branches/qpid-2935/qpid/cpp/src/qmf/engine/Agent.cpp
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Feb 17 14:08:14 2011
@@ -1 +1 @@
-/qpid/trunk/qpid/cpp/src/qmf/engine/Agent.cpp:1061302-1068442
+/qpid/trunk/qpid/cpp/src/qmf/engine/Agent.cpp:1061302-1071383
Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/RefCountedBuffer.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/RefCountedBuffer.h?rev=1071615&r1=1071614&r2=1071615&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/RefCountedBuffer.h (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/RefCountedBuffer.h Thu Feb 17 14:08:14 2011
@@ -27,7 +27,7 @@
#include <boost/intrusive_ptr.hpp>
namespace qpid {
-// FIXME aconway 2008-09-06: easy to add alignment
+
/**
* Reference-counted byte buffer.
* No alignment guarantees.
@@ -51,7 +51,7 @@ public:
pointer(const pointer&);
~pointer();
pointer& operator=(const pointer&);
-
+
char* get() { return cp(); }
operator char*() { return cp(); }
char& operator*() { return *cp(); }
@@ -62,7 +62,7 @@ public:
const char& operator*() const { return *cp(); }
const char& operator[](size_t i) const { return cp()[i]; }
};
-
+
/** Create a reference counted buffer of size n */
static pointer create(size_t n);
Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1071615&r1=1071614&r2=1071615&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.cpp Thu Feb 17 14:08:14 2011
@@ -103,7 +103,8 @@ Broker::Options::Options(const std::stri
requireEncrypted(false),
maxSessionRate(0),
asyncQueueEvents(false), // Must be false in a cluster.
- qmf2Support(false),
+ qmf2Support(true),
+ qmf1Support(true),
queueFlowStopRatio(80),
queueFlowResumeRatio(70)
{
@@ -125,7 +126,8 @@ Broker::Options::Options(const std::stri
("max-connections", optValue(maxConnections, "N"), "Sets the maximum allowed connections")
("connection-backlog", optValue(connectionBacklog, "N"), "Sets the connection backlog limit for the server socket")
("mgmt-enable,m", optValue(enableMgmt,"yes|no"), "Enable Management")
- ("mgmt-qmf2", optValue(qmf2Support,"yes|no"), "Use QMF v2 for Broker Management")
+ ("mgmt-qmf2", optValue(qmf2Support,"yes|no"), "Enable broadcast of management information over QMF v2")
+ ("mgmt-qmf1", optValue(qmf1Support,"yes|no"), "Enable broadcast of management information over QMF v1")
("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"), "Management Publish Interval")
("queue-purge-interval", optValue(queueCleanInterval, "SECONDS"),
"Interval between attempts to purge any expired messages from queues")
@@ -153,7 +155,7 @@ const std::string knownHostsNone("none")
Broker::Broker(const Broker::Options& conf) :
poller(new Poller),
config(conf),
- managementAgent(conf.enableMgmt ? new ManagementAgent(!conf.qmf2Support,
+ managementAgent(conf.enableMgmt ? new ManagementAgent(conf.qmf1Support,
conf.qmf2Support)
: 0),
store(new NullMessageStore),
@@ -225,7 +227,6 @@ Broker::Broker(const Broker::Options& co
}
QueuePolicy::setDefaultMaxSize(conf.queueLimit);
- queues.setQueueEvents(&queueEvents);
// Early-Initialize plugins
Plugin::earlyInitAll(*this);
Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.h?rev=1071615&r1=1071614&r2=1071615&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.h Thu Feb 17 14:08:14 2011
@@ -115,6 +115,7 @@ public:
uint32_t maxSessionRate;
bool asyncQueueEvents;
bool qmf2Support;
+ bool qmf1Support;
uint queueFlowStopRatio; // producer flow control: on
uint queueFlowResumeRatio; // producer flow control: off
Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Message.cpp?rev=1071615&r1=1071614&r2=1071615&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Message.cpp Thu Feb 17 14:08:14 2011
@@ -400,22 +400,6 @@ bool Message::hasExpired()
return expiryPolicy && expiryPolicy->hasExpired(*this);
}
-boost::intrusive_ptr<Message>& Message::getReplacementMessage(const Queue* qfor) const
-{
- sys::Mutex::ScopedLock l(lock);
- Replacement::iterator i = replacement.find(qfor);
- if (i != replacement.end()){
- return i->second;
- }
- return empty;
-}
-
-void Message::setReplacementMessage(boost::intrusive_ptr<Message> msg, const Queue* qfor)
-{
- sys::Mutex::ScopedLock l(lock);
- replacement[qfor] = msg;
-}
-
namespace {
struct ScopedSet {
sys::Monitor& lock;
Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Message.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Message.h?rev=1071615&r1=1071614&r2=1071615&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Message.h (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Message.h Thu Feb 17 14:08:14 2011
@@ -153,8 +153,6 @@ public:
void forcePersistent();
bool isForcedPersistent();
- boost::intrusive_ptr<Message>& getReplacementMessage(const Queue* qfor) const;
- void setReplacementMessage(boost::intrusive_ptr<Message> msg, const Queue* qfor);
/** Call cb when dequeue is complete, may call immediately. Holds cb by reference. */
void setDequeueCompleteCallback(MessageCallback& cb);
@@ -163,8 +161,6 @@ public:
uint8_t getPriority() const;
private:
- typedef std::map<const Queue*,boost::intrusive_ptr<Message> > Replacement;
-
MessageAdapter& getAdapter() const;
void allDequeuesComplete();
@@ -183,7 +179,6 @@ public:
static TransferAdapter TRANSFER;
- mutable Replacement replacement;
mutable boost::intrusive_ptr<Message> empty;
sys::Monitor callbackLock;
Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1071615&r1=1071614&r2=1071615&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.cpp Thu Feb 17 14:08:14 2011
@@ -23,11 +23,16 @@
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueEvents.h"
#include "qpid/broker/Exchange.h"
+#include "qpid/broker/Fairshare.h"
#include "qpid/broker/DeliverableMessage.h"
+#include "qpid/broker/LegacyLVQ.h"
+#include "qpid/broker/MessageDeque.h"
+#include "qpid/broker/MessageMap.h"
#include "qpid/broker/MessageStore.h"
#include "qpid/broker/NullMessageStore.h"
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/QueueFlowLimit.h"
+#include "qpid/broker/ThresholdAlerts.h"
#include "qpid/StringUtils.h"
#include "qpid/log/Statement.h"
@@ -67,11 +72,13 @@ const std::string qpidMaxCount("qpid.max
const std::string qpidNoLocal("no-local");
const std::string qpidTraceIdentity("qpid.trace.id");
const std::string qpidTraceExclude("qpid.trace.exclude");
+const std::string qpidLastValueQueueKey("qpid.last_value_queue_key");
const std::string qpidLastValueQueue("qpid.last_value_queue");
const std::string qpidLastValueQueueNoBrowse("qpid.last_value_queue_no_browse");
const std::string qpidPersistLastNode("qpid.persist_last_node");
const std::string qpidVQMatchProperty("qpid.LVQ_key");
const std::string qpidQueueEventGeneration("qpid.queue_event_generation");
+const std::string qpidAutoDeleteTimeout("qpid.auto_delete_timeout");
//following feature is not ready for general use as it doesn't handle
//the case where a message is enqueued on more than one queue well enough:
const std::string qpidInsertSequenceNumbers("qpid.insert_sequence_numbers");
@@ -93,19 +100,18 @@ Queue::Queue(const string& _name, bool _
consumerCount(0),
exclusive(0),
noLocal(false),
- lastValueQueue(false),
- lastValueQueueNoBrowse(false),
persistLastNode(false),
inLastNodeFailure(false),
+ messages(new MessageDeque()),
persistenceId(0),
policyExceeded(false),
mgmtObject(0),
eventMode(0),
- eventMgr(0),
insertSeqNo(0),
broker(b),
deleted(false),
- barrier(*this)
+ barrier(*this),
+ autoDeleteTimeout(0)
{
if (parent != 0 && broker != 0) {
ManagementAgent* agent = broker->getManagementAgent();
@@ -160,7 +166,6 @@ void Queue::deliver(boost::intrusive_ptr
} else {
enqueue(0, msg);
push(msg);
- mgntEnqStats(msg);
QPID_LOG(debug, "Message " << msg << " enqueued on " << name);
}
}
@@ -179,7 +184,6 @@ void Queue::recover(boost::intrusive_ptr
msg->addToSyncList(shared_from_this(), store);
}
msg->enqueueComplete(); // mark the message as enqueued
- mgntEnqStats(msg);
if (store && (!msg->isContentLoaded() || msg->checkContentReleasable())) {
//content has not been loaded, need to ensure that lazy loading mode is set:
@@ -194,7 +198,6 @@ void Queue::recover(boost::intrusive_ptr
void Queue::process(boost::intrusive_ptr<Message>& msg){
push(msg);
- mgntEnqStats(msg);
if (mgmtObject != 0){
mgmtObject->inc_msgTxnEnqueues ();
mgmtObject->inc_byteTxnEnqueues (msg->contentSize ());
@@ -208,7 +211,7 @@ void Queue::requeue(const QueuedMessage&
Mutex::ScopedLock locker(messageLock);
if (!isEnqueued(msg)) return;
msg.payload->enqueueComplete(); // mark the message as enqueued
- messages.insert(lower_bound(messages.begin(), messages.end(), msg), msg);
+ messages->reinsert(msg);
listeners.populate(copy);
// for persistLastNode - don't force a message twice to disk, but force it if no force before
@@ -223,57 +226,23 @@ void Queue::requeue(const QueuedMessage&
copy.notify();
}
-void Queue::clearLVQIndex(const QueuedMessage& msg){
- assertClusterSafe();
- const framing::FieldTable* ft = msg.payload ? msg.payload->getApplicationHeaders() : 0;
- if (lastValueQueue && ft){
- string key = ft->getAsString(qpidVQMatchProperty);
- lvq.erase(key);
- }
-}
-
bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message)
{
Mutex::ScopedLock locker(messageLock);
assertClusterSafe();
QPID_LOG(debug, "Attempting to acquire message at " << position);
-
- Messages::iterator i = findAt(position);
- if (i != messages.end() ) {
- message = *i;
- if (lastValueQueue) {
- clearLVQIndex(*i);
- }
- QPID_LOG(debug,
- "Acquired message at " << i->position << " from " << name);
- messages.erase(i);
+ if (messages->remove(position, message)) {
+ QPID_LOG(debug, "Acquired message at " << position << " from " << name);
return true;
- }
- QPID_LOG(debug, "Could not acquire message at " << position << " from " << name << "; no message at that position");
- return false;
+ } else {
+ QPID_LOG(debug, "Could not acquire message at " << position << " from " << name << "; no message at that position");
+ return false;
+ }
}
bool Queue::acquire(const QueuedMessage& msg) {
- Mutex::ScopedLock locker(messageLock);
- assertClusterSafe();
-
- QPID_LOG(debug, "attempting to acquire " << msg.position);
- Messages::iterator i = findAt(msg.position);
- if ((i != messages.end() && i->position == msg.position) && // note that in some cases payload not be set
- (!lastValueQueue ||
- (lastValueQueue && msg.payload.get() == checkLvqReplace(*i).payload.get()) ) // note this is safe for no payload set 0==0
- ) {
-
- clearLVQIndex(msg);
- QPID_LOG(debug,
- "Match found, acquire succeeded: " <<
- i->position << " == " << msg.position);
- messages.erase(i);
- return true;
- }
-
- QPID_LOG(debug, "Acquire failed for " << msg.position);
- return false;
+ QueuedMessage copy = msg;
+ return acquireMessageAt(msg.position, copy);
}
void Queue::notifyListener()
@@ -282,7 +251,7 @@ void Queue::notifyListener()
QueueListeners::NotificationSet set;
{
Mutex::ScopedLock locker(messageLock);
- if (messages.size()) {
+ if (messages->size()) {
listeners.populate(set);
}
}
@@ -311,12 +280,12 @@ Queue::ConsumeCode Queue::consumeNextMes
{
while (true) {
Mutex::ScopedLock locker(messageLock);
- if (messages.empty()) {
+ if (messages->empty()) {
QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
listeners.addListener(c);
return NO_MESSAGES;
} else {
- QueuedMessage msg = getFront();
+ QueuedMessage msg = messages->front();
if (msg.payload->hasExpired()) {
QPID_LOG(debug, "Message expired from queue '" << name << "'");
popAndDequeue();
@@ -326,7 +295,7 @@ Queue::ConsumeCode Queue::consumeNextMes
if (c->filter(msg.payload)) {
if (c->accept(msg.payload)) {
m = msg;
- popMsg(msg);
+ pop();
return CONSUMED;
} else {
//message(s) are available but consumer hasn't got enough credit
@@ -352,11 +321,6 @@ bool Queue::browseNextMessage(QueuedMess
//consumer wants the message
c->position = msg.position;
m = msg;
- if (!lastValueQueueNoBrowse) clearLVQIndex(msg);
- if (lastValueQueue) {
- boost::intrusive_ptr<Message> replacement = msg.payload->getReplacementMessage(this);
- if (replacement.get()) m.payload = replacement;
- }
return true;
} else {
//browser hasn't got enough credit for the message
@@ -378,7 +342,7 @@ void Queue::removeListener(Consumer::sha
{
Mutex::ScopedLock locker(messageLock);
listeners.removeListener(c);
- if (messages.size()) {
+ if (messages->size()) {
listeners.populate(set);
}
}
@@ -399,52 +363,20 @@ bool Queue::dispatch(Consumer::shared_pt
// Find the next message
bool Queue::seek(QueuedMessage& msg, Consumer::shared_ptr c) {
Mutex::ScopedLock locker(messageLock);
- if (!messages.empty() && messages.back().position > c->position) {
- if (c->position < getFront().position) {
- msg = getFront();
- return true;
- } else {
- Messages::iterator pos = findAt(c->position);
- if (pos != messages.end() && pos+1 != messages.end()) {
- msg = *(pos+1);
- return true;
- }
- }
+ if (messages->next(c->position, msg)) {
+ return true;
+ } else {
+ listeners.addListener(c);
+ return false;
}
- listeners.addListener(c);
- return false;
-}
-
-Queue::Messages::iterator Queue::findAt(SequenceNumber pos) {
-
- if(!messages.empty()){
- QueuedMessage compM;
- compM.position = pos;
- unsigned long diff = pos.getValue() - messages.front().position.getValue();
- long maxEnd = diff < messages.size()? diff : messages.size();
-
- Messages::iterator i = lower_bound(messages.begin(),messages.begin()+maxEnd,compM);
- if (i!= messages.end() && i->position == pos)
- return i;
- }
- return messages.end(); // no match found.
}
-
QueuedMessage Queue::find(SequenceNumber pos) const {
Mutex::ScopedLock locker(messageLock);
- if(!messages.empty()){
- QueuedMessage compM;
- compM.position = pos;
- unsigned long diff = pos.getValue() - messages.front().position.getValue();
- long maxEnd = diff < messages.size()? diff : messages.size();
-
- Messages::const_iterator i = lower_bound(messages.begin(),messages.begin()+maxEnd,compM);
- if (i != messages.end())
- return *i;
- }
- return QueuedMessage();
+ QueuedMessage msg;
+ messages->find(pos, msg);
+ return msg;
}
void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){
@@ -464,6 +396,10 @@ void Queue::consume(Consumer::shared_ptr
consumerCount++;
if (mgmtObject != 0)
mgmtObject->inc_consumerCount ();
+ //reset auto deletion timer if necessary
+ if (autoDeleteTimeout && autoDeleteTask) {
+ autoDeleteTask->cancel();
+ }
}
void Queue::cancel(Consumer::shared_ptr c){
@@ -478,12 +414,18 @@ void Queue::cancel(Consumer::shared_ptr
QueuedMessage Queue::get(){
Mutex::ScopedLock locker(messageLock);
QueuedMessage msg(this);
+ messages->pop(msg);
+ return msg;
+}
- if(!messages.empty()){
- msg = getFront();
- popMsg(msg);
+bool collect_if_expired(std::deque<QueuedMessage>& expired, QueuedMessage& message)
+{
+ if (message.payload->hasExpired()) {
+ expired.push_back(message);
+ return true;
+ } else {
+ return false;
}
- return msg;
}
void Queue::purgeExpired()
@@ -492,37 +434,11 @@ void Queue::purgeExpired()
//bother explicitly expiring if the rate of dequeues since last
//attempt is less than one per second.
- //Note: This method is currently called periodically on the timer
- //thread. In a clustered broker this means that the purging does
- //not occur on the cluster event dispatch thread and consequently
- //that is not totally ordered w.r.t other events (including
- //publication of messages). However the cluster does ensure that
- //the actual expiration of messages (as distinct from the removing
- //of those expired messages from the queue) *is* consistently
- //ordered w.r.t. cluster events. This means that delivery of
- //messages is in general consistent across the cluster inspite of
- //any non-determinism in the triggering of a purge. However at
- //present purging a last value queue could potentially cause
- //inconsistencies in the cluster (as the order w.r.t publications
- //can affect the order in which messages appear in the
- //queue). Consequently periodic purging of an LVQ is not enabled
- //(expired messages will be removed on delivery and consolidated
- //by key as part of normal LVQ operation).
-
- if (dequeueTracker.sampleRatePerSecond() < 1 && !lastValueQueue) {
- Messages expired;
+ if (dequeueTracker.sampleRatePerSecond() < 1) {
+ std::deque<QueuedMessage> expired;
{
Mutex::ScopedLock locker(messageLock);
- for (Messages::iterator i = messages.begin(); i != messages.end();) {
- //Re-introduce management of LVQ-specific state here
- //if purging is renabled for that case (see note above)
- if (i->payload->hasExpired()) {
- expired.push_back(*i);
- i = messages.erase(i);
- } else {
- ++i;
- }
- }
+ messages->removeIf(boost::bind(&collect_if_expired, expired, _1));
}
for_each(expired.begin(), expired.end(), bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
}
@@ -548,13 +464,13 @@ uint32_t Queue::purge(const uint32_t pur
uint32_t count = 0;
// Either purge them all or just the some (purge_count) while the queue isn't empty.
- while((!purge_request || purge_count--) && !messages.empty()) {
+ while((!purge_request || purge_count--) && !messages->empty()) {
if (dest.get()) {
//
// If there is a destination exchange, stage the messages onto a reroute queue
// so they don't wind up getting purged more than once.
//
- DeliverableMessage msg(getFront().payload);
+ DeliverableMessage msg(messages->front().payload);
rerouteQueue.push_back(msg);
}
popAndDequeue();
@@ -580,101 +496,53 @@ uint32_t Queue::move(const Queue::shared
uint32_t move_count = qty; // only comes into play if qty >0
uint32_t count = 0; // count how many were moved for returning
- while((!qty || move_count--) && !messages.empty()) {
- QueuedMessage qmsg = getFront();
+ while((!qty || move_count--) && !messages->empty()) {
+ QueuedMessage qmsg = messages->front();
boost::intrusive_ptr<Message> msg = qmsg.payload;
destq->deliver(msg); // deliver message to the destination queue
- popMsg(qmsg);
+ pop();
dequeue(0, qmsg);
count++;
}
return count;
}
-void Queue::popMsg(QueuedMessage& qmsg)
+void Queue::pop()
{
assertClusterSafe();
- const framing::FieldTable* ft = qmsg.payload->getApplicationHeaders();
- if (lastValueQueue && ft){
- string key = ft->getAsString(qpidVQMatchProperty);
- lvq.erase(key);
- }
- messages.pop_front();
+ messages->pop();
++dequeueTracker;
}
void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
assertClusterSafe();
QueueListeners::NotificationSet copy;
+ QueuedMessage removed;
+ bool dequeueRequired = false;
{
Mutex::ScopedLock locker(messageLock);
QueuedMessage qm(this, msg, ++sequence);
if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence);
- LVQ::iterator i;
- const framing::FieldTable* ft = msg->getApplicationHeaders();
- if (lastValueQueue && ft){
- string key = ft->getAsString(qpidVQMatchProperty);
-
- i = lvq.find(key);
- if (i == lvq.end() || (broker && broker->isClusterUpdatee())) {
- messages.push_back(qm);
- listeners.populate(copy);
- lvq[key] = msg;
- }else {
- boost::intrusive_ptr<Message> old = i->second->getReplacementMessage(this);
- if (!old) old = i->second;
- i->second->setReplacementMessage(msg,this);
- if (isRecovery) {
- //can't issue new requests for the store until
- //recovery is complete
- pendingDequeues.push_back(QueuedMessage(qm.queue, old, qm.position));
- } else {
- Mutex::ScopedUnlock u(messageLock);
- dequeue(0, QueuedMessage(qm.queue, old, qm.position));
- }
- }
- }else {
- messages.push_back(qm);
- listeners.populate(copy);
- }
- if (eventMode) {
- if (eventMgr) eventMgr->enqueued(qm);
- else QPID_LOG(warning, "Enqueue manager not set, events not generated for " << getName());
- }
- if (policy.get()) {
- policy->enqueued(qm);
- }
- if (flowLimit.get())
- flowLimit->enqueued(qm);
+ dequeueRequired = messages->push(qm, removed);
+ listeners.populate(copy);
+ enqueued(qm);
}
copy.notify();
-}
-
-QueuedMessage Queue::getFront()
-{
- QueuedMessage msg = messages.front();
- if (lastValueQueue) {
- boost::intrusive_ptr<Message> replacement = msg.payload->getReplacementMessage(this);
- if (replacement.get()) msg.payload = replacement;
+ if (dequeueRequired) {
+ if (isRecovery) {
+ //can't issue new requests for the store until
+ //recovery is complete
+ pendingDequeues.push_back(removed);
+ } else {
+ dequeue(0, removed);
+ }
}
- return msg;
}
-QueuedMessage& Queue::checkLvqReplace(QueuedMessage& msg)
+void isEnqueueComplete(uint32_t* result, const QueuedMessage& message)
{
- boost::intrusive_ptr<Message> replacement = msg.payload->getReplacementMessage(this);
- if (replacement.get()) {
- const framing::FieldTable* ft = replacement->getApplicationHeaders();
- if (ft) {
- string key = ft->getAsString(qpidVQMatchProperty);
- if (lvq.find(key) != lvq.end()){
- lvq[key] = replacement;
- }
- }
- msg.payload = replacement;
- }
- return msg;
+ if (message.payload->isIngressComplete()) (*result)++;
}
/** function only provided for unit tests, or code not in critical message path */
@@ -682,20 +550,14 @@ uint32_t Queue::getEnqueueCompleteMessag
{
Mutex::ScopedLock locker(messageLock);
uint32_t count = 0;
- for ( Messages::const_iterator i = messages.begin(); i != messages.end(); ++i ) {
- //NOTE: don't need to use checkLvqReplace() here as it
- //is only relevant for LVQ which does not support persistence
- //so the enqueueComplete check has no effect
- if ( i->payload->isIngressComplete() ) count ++;
- }
-
+ messages->foreach(boost::bind(&isEnqueueComplete, &count, _1));
return count;
}
uint32_t Queue::getMessageCount() const
{
Mutex::ScopedLock locker(messageLock);
- return messages.size();
+ return messages->size();
}
uint32_t Queue::getConsumerCount() const
@@ -707,7 +569,7 @@ uint32_t Queue::getConsumerCount() const
bool Queue::canAutoDelete() const
{
Mutex::ScopedLock locker(consumerLock);
- return autodelete && !consumerCount;
+ return autodelete && !consumerCount && !owner;
}
void Queue::clearLastNodeFailure()
@@ -715,21 +577,22 @@ void Queue::clearLastNodeFailure()
inLastNodeFailure = false;
}
+void Queue::forcePersistent(QueuedMessage& message)
+{
+ if(!message.payload->isStoredOnQueue(shared_from_this())) {
+ message.payload->forcePersistent();
+ if (message.payload->isForcedPersistent() ){
+ enqueue(0, message.payload);
+ }
+ }
+}
+
void Queue::setLastNodeFailure()
{
if (persistLastNode){
Mutex::ScopedLock locker(messageLock);
try {
- for ( Messages::iterator i = messages.begin(); i != messages.end(); ++i ) {
- if (lastValueQueue) checkLvqReplace(*i);
- // don't force a message twice to disk.
- if(!i->payload->isStoredOnQueue(shared_from_this())) {
- i->payload->forcePersistent();
- if (i->payload->isForcedPersistent() ){
- enqueue(0, i->payload);
- }
- }
- }
+ messages->foreach(boost::bind(&Queue::forcePersistent, this, _1));
} catch (const std::exception& e) {
// Could not go into last node standing (for example journal not large enough)
QPID_LOG(error, "Unable to fail to last node standing for queue: " << name << " : " << e.what());
@@ -746,7 +609,7 @@ bool Queue::enqueue(TransactionContext*
if (!u.acquired) return false;
if (policy.get() && !suppressPolicyCheck) {
- Messages dequeues;
+ std::deque<QueuedMessage> dequeues;
{
Mutex::ScopedLock locker(messageLock);
policy->tryEnqueue(msg);
@@ -833,8 +696,8 @@ void Queue::dequeueCommitted(const Queue
*/
void Queue::popAndDequeue()
{
- QueuedMessage msg = getFront();
- popMsg(msg);
+ QueuedMessage msg = messages->front();
+ pop();
dequeue(0, msg);
}
@@ -845,11 +708,16 @@ void Queue::popAndDequeue()
void Queue::dequeued(const QueuedMessage& msg)
{
if (policy.get()) policy->dequeued(msg);
+ /** todo KAG make flowLimit an observer */
if (flowLimit.get())
flowLimit->dequeued(msg);
mgntDeqStats(msg.payload);
- if (eventMode == ENQUEUE_AND_DEQUEUE && eventMgr) {
- eventMgr->dequeued(msg);
+ for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
+ try{
+ (*i)->dequeued(msg);
+ } catch (const std::exception& e) {
+ QPID_LOG(warning, "Exception on notification of dequeue for queue " << getName() << ": " << e.what());
+ }
}
}
@@ -863,16 +731,41 @@ void Queue::create(const FieldTable& _se
configure(_settings);
}
+
+int getIntegerSetting(const qpid::framing::FieldTable& settings, const std::string& key)
+{
+ qpid::framing::FieldTable::ValuePtr v = settings.get(key);
+ if (!v) {
+ return 0;
+ } else if (v->convertsTo<int>()) {
+ return v->get<int>();
+ } else if (v->convertsTo<std::string>()){
+ std::string s = v->get<std::string>();
+ try {
+ return boost::lexical_cast<int>(s);
+ } catch(const boost::bad_lexical_cast&) {
+ QPID_LOG(warning, "Ignoring invalid integer value for " << key << ": " << s);
+ return 0;
+ }
+ } else {
+ QPID_LOG(warning, "Ignoring invalid integer value for " << key << ": " << *v);
+ return 0;
+ }
+}
+
void Queue::configure(const FieldTable& _settings, bool recovering)
{
eventMode = _settings.getAsInt(qpidQueueEventGeneration);
+ if (eventMode && broker) {
+ broker->getQueueEvents().observe(*this, eventMode == ENQUEUE_ONLY);
+ }
if (QueuePolicy::getType(_settings) == QueuePolicy::FLOW_TO_DISK &&
- (!store || NullMessageStore::isNullStore(store) || (eventMode && eventMgr && !eventMgr->isSync()) )) {
+ (!store || NullMessageStore::isNullStore(store) || (broker && !(broker->getQueueEvents().isSync())) )) {
if ( NullMessageStore::isNullStore(store)) {
QPID_LOG(warning, "Flow to disk not valid for non-persisted queue:" << getName());
- } else if (eventMgr && !eventMgr->isSync() ) {
+ } else if (broker && !(broker->getQueueEvents().isSync()) ) {
QPID_LOG(warning, "Flow to disk not valid with async Queue Events:" << getName());
}
FieldTable copy(_settings);
@@ -881,17 +774,30 @@ void Queue::configure(const FieldTable&
} else {
setPolicy(QueuePolicy::createQueuePolicy(getName(), _settings));
}
+ if (broker && broker->getManagementAgent()) {
+ ThresholdAlerts::observe(*this, *(broker->getManagementAgent()), _settings);
+ }
+
//set this regardless of owner to allow use of no-local with exclusive consumers also
noLocal = _settings.get(qpidNoLocal);
QPID_LOG(debug, "Configured queue " << getName() << " with no-local=" << noLocal);
- lastValueQueue= _settings.get(qpidLastValueQueue);
- if (lastValueQueue) QPID_LOG(debug, "Configured queue as Last Value Queue for: " << getName());
-
- lastValueQueueNoBrowse = _settings.get(qpidLastValueQueueNoBrowse);
- if (lastValueQueueNoBrowse){
- QPID_LOG(debug, "Configured queue as Last Value Queue No Browse for: " << getName());
- lastValueQueue = lastValueQueueNoBrowse;
+ std::string lvqKey = _settings.getAsString(qpidLastValueQueueKey);
+ if (lvqKey.size()) {
+ QPID_LOG(debug, "Configured queue " << getName() << " as Last Value Queue with key " << lvqKey);
+ messages = std::auto_ptr<Messages>(new MessageMap(lvqKey));
+ } else if (_settings.get(qpidLastValueQueueNoBrowse)) {
+ QPID_LOG(debug, "Configured queue " << getName() << " as Legacy Last Value Queue with 'no-browse' on");
+ messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, true, broker);
+ } else if (_settings.get(qpidLastValueQueue)) {
+ QPID_LOG(debug, "Configured queue " << getName() << " as Legacy Last Value Queue");
+ messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, false, broker);
+ } else {
+ std::auto_ptr<Messages> m = Fairshare::create(_settings);
+ if (m.get()) {
+ messages = m;
+ QPID_LOG(debug, "Configured queue " << getName() << " as priority queue.");
+ }
}
persistLastNode= _settings.get(qpidPersistLastNode);
@@ -910,6 +816,10 @@ void Queue::configure(const FieldTable&
flowLimit = QueueFlowLimit::createQueueFlowLimit(this, _settings);
+ autoDeleteTimeout = getIntegerSetting(_settings, qpidAutoDeleteTimeout);
+ if (autoDeleteTimeout)
+ QPID_LOG(debug, "Configured queue " << getName() << " with qpid.auto_delete_timeout=" << autoDeleteTimeout);
+
if (mgmtObject != 0) {
mgmtObject->set_arguments(ManagementAgent::toMap(_settings));
if (flowLimit.get())
@@ -924,8 +834,8 @@ void Queue::destroy()
{
if (alternateExchange.get()) {
Mutex::ScopedLock locker(messageLock);
- while(!messages.empty()){
- DeliverableMessage msg(getFront().payload);
+ while(!messages->empty()){
+ DeliverableMessage msg(messages->front().payload);
alternateExchange->route(msg, msg.getMessage().getRoutingKey(),
msg.getMessage().getApplicationHeaders());
popAndDequeue();
@@ -939,6 +849,7 @@ void Queue::destroy()
store->destroy(*this);
store = 0;//ensure we make no more calls to the store for this queue
}
+ if (autoDeleteTask) autoDeleteTask = boost::intrusive_ptr<TimerTask>();
}
void Queue::notifyDeleted()
@@ -1043,15 +954,46 @@ boost::shared_ptr<Exchange> Queue::getAl
return alternateExchange;
}
-void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue)
+void tryAutoDeleteImpl(Broker& broker, Queue::shared_ptr queue)
{
if (broker.getQueues().destroyIf(queue->getName(),
boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) {
+ QPID_LOG(debug, "Auto-deleting " << queue->getName());
queue->unbind(broker.getExchanges(), queue);
queue->destroy();
}
}
+struct AutoDeleteTask : qpid::sys::TimerTask
+{
+ Broker& broker;
+ Queue::shared_ptr queue;
+
+ AutoDeleteTask(Broker& b, Queue::shared_ptr q, AbsTime fireTime)
+ : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion"), broker(b), queue(q) {}
+
+ void fire()
+ {
+ //need to detect case where queue was used after the task was
+ //created, but then became unused again before the task fired;
+ //in this case ignore this request as there will have already
+ //been a later task added
+ tryAutoDeleteImpl(broker, queue);
+ }
+};
+
+void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue)
+{
+ if (queue->autoDeleteTimeout && queue->canAutoDelete()) {
+ AbsTime time(now(), Duration(queue->autoDeleteTimeout * TIME_SEC));
+ queue->autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new AutoDeleteTask(broker, queue, time));
+ broker.getClusterTimer().add(queue->autoDeleteTask);
+ QPID_LOG(debug, "Timed auto-delete for " << queue->getName() << " initiated");
+ } else {
+ tryAutoDeleteImpl(broker, queue);
+ }
+}
+
bool Queue::isExclusiveOwner(const OwnershipToken* const o) const
{
Mutex::ScopedLock locker(ownershipLock);
@@ -1066,6 +1008,10 @@ void Queue::releaseExclusiveOwnership()
bool Queue::setExclusiveOwner(const OwnershipToken* const o)
{
+ //reset auto deletion timer if necessary
+ if (autoDeleteTimeout && autoDeleteTask) {
+ autoDeleteTask->cancel();
+ }
Mutex::ScopedLock locker(ownershipLock);
if (owner) {
return false;
@@ -1154,11 +1100,6 @@ SequenceNumber Queue::getPosition() {
int Queue::getEventMode() { return eventMode; }
-void Queue::setQueueEventManager(QueueEvents& mgr)
-{
- eventMgr = &mgr;
-}
-
void Queue::recoveryComplete(ExchangeRegistry& exchanges)
{
// set the alternate exchange
@@ -1184,16 +1125,31 @@ void Queue::insertSequenceNumbers(const
void Queue::enqueued(const QueuedMessage& m)
{
- if (m.payload) {
- if (policy.get()) {
- policy->recoverEnqueued(m.payload);
- policy->enqueued(m);
+ for (Observers::iterator i = observers.begin(); i != observers.end(); ++i) {
+ try {
+ (*i)->enqueued(m);
+ } catch (const std::exception& e) {
+ QPID_LOG(warning, "Exception on notification of enqueue for queue " << getName() << ": " << e.what());
}
- if (flowLimit.get())
- flowLimit->enqueued(m);
- mgntEnqStats(m.payload);
+ }
+ if (policy.get()) {
+ policy->enqueued(m);
+ }
+ /** todo make flowlimit an observer */
+ if (flowLimit.get())
+ flowLimit->enqueued(m);
+ mgntEnqStats(m.payload);
+}
+
+void Queue::updateEnqueued(const QueuedMessage& m)
+{
+ if (m.payload) {
boost::intrusive_ptr<Message> payload = m.payload;
enqueue ( 0, payload, true );
+ if (policy.get()) {
+ policy->recoverEnqueued(payload);
+ }
+ enqueued(m);
} else {
QPID_LOG(warning, "Queue informed of enqueued message that has no payload");
}
@@ -1205,6 +1161,8 @@ bool Queue::isEnqueued(const QueuedMessa
}
QueueListeners& Queue::getListeners() { return listeners; }
+Messages& Queue::getMessages() { return *messages; }
+const Messages& Queue::getMessages() const { return *messages; }
void Queue::checkNotDeleted()
{
@@ -1213,6 +1171,11 @@ void Queue::checkNotDeleted()
}
}
+void Queue::addObserver(boost::shared_ptr<QueueObserver> observer)
+{
+ observers.insert(observer);
+}
+
void Queue::flush()
{
ScopedUse u(barrier);
Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.h?rev=1071615&r1=1071614&r2=1071615&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.h Thu Feb 17 14:08:14 2011
@@ -26,14 +26,17 @@
#include "qpid/broker/OwnershipToken.h"
#include "qpid/broker/Consumer.h"
#include "qpid/broker/Message.h"
+#include "qpid/broker/Messages.h"
#include "qpid/broker/PersistableQueue.h"
#include "qpid/broker/QueuePolicy.h"
#include "qpid/broker/QueueBindings.h"
#include "qpid/broker/QueueListeners.h"
+#include "qpid/broker/QueueObserver.h"
#include "qpid/broker/RateTracker.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/sys/Monitor.h"
+#include "qpid/sys/Timer.h"
#include "qpid/management/Manageable.h"
#include "qmf/org/apache/qpid/broker/Queue.h"
#include "qpid/framing/amqp_types.h"
@@ -46,6 +49,7 @@
#include <vector>
#include <memory>
#include <deque>
+#include <set>
#include <algorithm>
namespace qpid {
@@ -86,10 +90,10 @@ class Queue : public boost::enable_share
~ScopedUse() { if (acquired) barrier.release(); }
};
- typedef std::deque<QueuedMessage> Messages;
- typedef std::map<std::string,boost::intrusive_ptr<Message> > LVQ;
+ typedef std::set< boost::shared_ptr<QueueObserver> > Observers;
enum ConsumeCode {NO_MESSAGES=0, CANT_CONSUME=1, CONSUMED=2};
+
const std::string name;
const bool autodelete;
MessageStore* store;
@@ -97,16 +101,13 @@ class Queue : public boost::enable_share
uint32_t consumerCount;
OwnershipToken* exclusive;
bool noLocal;
- bool lastValueQueue;
- bool lastValueQueueNoBrowse;
bool persistLastNode;
bool inLastNodeFailure;
std::string traceId;
std::vector<std::string> traceExclude;
QueueListeners listeners;
- Messages messages;
- Messages pendingDequeues;//used to avoid dequeuing during recovery
- LVQ lvq;
+ std::auto_ptr<Messages> messages;
+ std::deque<QueuedMessage> pendingDequeues;//used to avoid dequeuing during recovery
mutable qpid::sys::Mutex consumerLock;
mutable qpid::sys::Monitor messageLock;
mutable qpid::sys::Mutex ownershipLock;
@@ -122,12 +123,14 @@ class Queue : public boost::enable_share
qmf::org::apache::qpid::broker::Queue* mgmtObject;
RateTracker dequeueTracker;
int eventMode;
- QueueEvents* eventMgr;
+ Observers observers;
bool insertSeqNo;
std::string seqNoKey;
Broker* broker;
bool deleted;
UsageBarrier barrier;
+ int autoDeleteTimeout;
+ boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask;
void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false);
void setPolicy(std::auto_ptr<QueuePolicy> policy);
@@ -141,12 +144,13 @@ class Queue : public boost::enable_share
bool isExcluded(boost::intrusive_ptr<Message>& msg);
+ void enqueued(const QueuedMessage& msg);
void dequeued(const QueuedMessage& msg);
- void popMsg(QueuedMessage& qmsg);
+ void pop();
void popAndDequeue();
QueuedMessage getFront();
- QueuedMessage& checkLvqReplace(QueuedMessage& msg);
- void clearLVQIndex(const QueuedMessage& msg);
+ void forcePersistent(QueuedMessage& msg);
+ int getEventMode();
inline void mgntEnqStats(const boost::intrusive_ptr<Message>& msg)
{
@@ -171,7 +175,6 @@ class Queue : public boost::enable_share
}
}
- Messages::iterator findAt(framing::SequenceNumber pos);
void checkNotDeleted();
public:
@@ -277,7 +280,7 @@ class Queue : public boost::enable_share
* thus are still logically on the queue) - used in
* clustered broker.
*/
- void enqueued(const QueuedMessage& msg);
+ void updateEnqueued(const QueuedMessage& msg);
/**
* Test whether the specified message (identified by its
@@ -322,13 +325,7 @@ class Queue : public boost::enable_share
/** Apply f to each Message on the queue. */
template <class F> void eachMessage(F f) {
sys::Mutex::ScopedLock l(messageLock);
- if (lastValueQueue) {
- for (Messages::iterator i = messages.begin(); i != messages.end(); ++i) {
- f(checkLvqReplace(*i));
- }
- } else {
- std::for_each(messages.begin(), messages.end(), f);
- }
+ messages->foreach(f);
}
/** Apply f to each QueueBinding on the queue */
@@ -344,8 +341,7 @@ class Queue : public boost::enable_share
/** return current position sequence number for the next message on the queue.
*/
QPID_BROKER_EXTERN framing::SequenceNumber getPosition();
- int getEventMode();
- void setQueueEventManager(QueueEvents&);
+ void addObserver(boost::shared_ptr<QueueObserver>);
QPID_BROKER_EXTERN void insertSequenceNumbers(const std::string& key);
/**
* Notify queue that recovery has completed.
@@ -354,6 +350,8 @@ class Queue : public boost::enable_share
// For cluster update
QueueListeners& getListeners();
+ Messages& getMessages();
+ const Messages& getMessages() const;
/**
* Reserve space in policy for an enqueued message that
Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueEvents.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueEvents.cpp?rev=1071615&r1=1071614&r2=1071615&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueEvents.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueEvents.cpp Thu Feb 17 14:08:14 2011
@@ -19,6 +19,8 @@
*
*/
#include "qpid/broker/QueueEvents.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/QueueObserver.h"
#include "qpid/Exception.h"
#include "qpid/log/Statement.h"
@@ -115,6 +117,29 @@ bool QueueEvents::isSync()
return sync;
}
+class EventGenerator : public QueueObserver
+{
+ public:
+ EventGenerator(QueueEvents& mgr, bool enqOnly) : manager(mgr), enqueueOnly(enqOnly) {}
+ void enqueued(const QueuedMessage& m)
+ {
+ manager.enqueued(m);
+ }
+ void dequeued(const QueuedMessage& m)
+ {
+ if (!enqueueOnly) manager.dequeued(m);
+ }
+ private:
+ QueueEvents& manager;
+ const bool enqueueOnly;
+};
+
+void QueueEvents::observe(Queue& queue, bool enqueueOnly)
+{
+ boost::shared_ptr<QueueObserver> observer(new EventGenerator(*this, enqueueOnly));
+ queue.addObserver(observer);
+}
+
QueueEvents::Event::Event(EventType t, const QueuedMessage& m) : type(t), msg(m) {}
Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueEvents.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueEvents.h?rev=1071615&r1=1071614&r2=1071615&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueEvents.h (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueEvents.h Thu Feb 17 14:08:14 2011
@@ -63,6 +63,7 @@ class QueueEvents
QPID_BROKER_EXTERN void unregisterListener(const std::string& id);
void enable();
void disable();
+ void observe(Queue&, bool enqueueOnly);
//process all outstanding events
QPID_BROKER_EXTERN void shutdown();
QPID_BROKER_EXTERN bool isSync();
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org