You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2010/03/27 03:26:14 UTC
svn commit: r928123 [1/2] - in /qpid/branches/qmf-devel0.7a/qpid: ./ cpp/
cpp/examples/messaging/ cpp/examples/tradedemo/ cpp/include/qmf/engine/
cpp/include/qpid/agent/ cpp/include/qpid/messaging/ cpp/src/
cpp/src/qmf/engine/ cpp/src/qpid/agent/ cpp/s...
Author: tross
Date: Sat Mar 27 02:26:12 2010
New Revision: 928123
URL: http://svn.apache.org/viewvc?rev=928123&view=rev
Log:
Merged the trunk back into the branch.
Added:
qpid/branches/qmf-devel0.7a/qpid/doc/book/src/Java-JMS-Selector-Syntax.xml
- copied unchanged from r928107, qpid/trunk/qpid/doc/book/src/Java-JMS-Selector-Syntax.xml
qpid/branches/qmf-devel0.7a/qpid/doc/book/src/images/qpid-logo.png
- copied unchanged from r928107, qpid/trunk/qpid/doc/book/src/images/qpid-logo.png
qpid/branches/qmf-devel0.7a/qpid/java/tools/README
- copied unchanged from r928107, qpid/trunk/qpid/java/tools/README
qpid/branches/qmf-devel0.7a/qpid/java/tools/etc/
- copied from r928107, qpid/trunk/qpid/java/tools/etc/
qpid/branches/qmf-devel0.7a/qpid/java/tools/etc/jndi.properties
- copied unchanged from r928107, qpid/trunk/qpid/java/tools/etc/jndi.properties
qpid/branches/qmf-devel0.7a/qpid/java/tools/etc/test.log4j
- copied unchanged from r928107, qpid/trunk/qpid/java/tools/etc/test.log4j
qpid/branches/qmf-devel0.7a/qpid/packaging/
- copied from r928107, qpid/trunk/qpid/packaging/
qpid/branches/qmf-devel0.7a/qpid/packaging/windows/ (props changed)
- copied from r928107, qpid/trunk/qpid/packaging/windows/
qpid/branches/qmf-devel0.7a/qpid/packaging/windows/INSTALL_NOTES.html
- copied unchanged from r928107, qpid/trunk/qpid/packaging/windows/INSTALL_NOTES.html
qpid/branches/qmf-devel0.7a/qpid/packaging/windows/LICENSE.rtf
- copied unchanged from r928107, qpid/trunk/qpid/packaging/windows/LICENSE.rtf
qpid/branches/qmf-devel0.7a/qpid/packaging/windows/build_installer.bat
- copied unchanged from r928107, qpid/trunk/qpid/packaging/windows/build_installer.bat
qpid/branches/qmf-devel0.7a/qpid/packaging/windows/installer.proj
- copied unchanged from r928107, qpid/trunk/qpid/packaging/windows/installer.proj
qpid/branches/qmf-devel0.7a/qpid/packaging/windows/qpid-icon.ico
- copied unchanged from r928107, qpid/trunk/qpid/packaging/windows/qpid-icon.ico
qpid/branches/qmf-devel0.7a/qpid/packaging/windows/qpid-install-background.bmp
- copied unchanged from r928107, qpid/trunk/qpid/packaging/windows/qpid-install-background.bmp
qpid/branches/qmf-devel0.7a/qpid/packaging/windows/qpid-install-banner.bmp
- copied unchanged from r928107, qpid/trunk/qpid/packaging/windows/qpid-install-banner.bmp
qpid/branches/qmf-devel0.7a/qpid/packaging/windows/qpidc.wxs
- copied unchanged from r928107, qpid/trunk/qpid/packaging/windows/qpidc.wxs
Modified:
qpid/branches/qmf-devel0.7a/qpid/ (props changed)
qpid/branches/qmf-devel0.7a/qpid/cpp/CMakeLists.txt
qpid/branches/qmf-devel0.7a/qpid/cpp/examples/messaging/readme.txt
qpid/branches/qmf-devel0.7a/qpid/cpp/examples/tradedemo/tradedemo_topic_publisher.vcproj (contents, props changed)
qpid/branches/qmf-devel0.7a/qpid/cpp/include/qmf/engine/Agent.h (props changed)
qpid/branches/qmf-devel0.7a/qpid/cpp/include/qmf/engine/Console.h (props changed)
qpid/branches/qmf-devel0.7a/qpid/cpp/include/qpid/agent/ManagementAgent.h
qpid/branches/qmf-devel0.7a/qpid/cpp/include/qpid/messaging/Address.h
qpid/branches/qmf-devel0.7a/qpid/cpp/src/CMakeLists.txt
qpid/branches/qmf-devel0.7a/qpid/cpp/src/qmf/engine/Agent.cpp (props changed)
qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/Queue.cpp
qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/SessionImpl.cpp
qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp
qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (contents, props changed)
qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/UpdateClient.h (props changed)
qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementAgent.cpp (props changed)
qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementAgent.h (props changed)
qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/Address.cpp
qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/Timer.cpp
qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/Timer.h
qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/MessagingSessionTests.cpp
qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/cluster.mk
qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/cluster_tests.py (contents, props changed)
qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/run_long_cluster_tests
qpid/branches/qmf-devel0.7a/qpid/doc/book/src/AMQP-Java-JMS-Messaging-Client.xml
qpid/branches/qmf-devel0.7a/qpid/doc/book/src/schemas.xml
qpid/branches/qmf-devel0.7a/qpid/dotnet/build-msbuild.bat (props changed)
qpid/branches/qmf-devel0.7a/qpid/dotnet/build-nant-release (props changed)
qpid/branches/qmf-devel0.7a/qpid/dotnet/build-nant.bat (props changed)
qpid/branches/qmf-devel0.7a/qpid/java/ (props changed)
qpid/branches/qmf-devel0.7a/qpid/java/broker/ (props changed)
qpid/branches/qmf-devel0.7a/qpid/java/broker/bin/ (props changed)
qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ (props changed)
qpid/branches/qmf-devel0.7a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ (props changed)
qpid/branches/qmf-devel0.7a/qpid/java/lib/org.osgi.core_1.0.0.jar (props changed)
qpid/branches/qmf-devel0.7a/qpid/java/management/client/src/main/java/org/apache/qpid/management/ (props changed)
qpid/branches/qmf-devel0.7a/qpid/java/management/client/src/test/java/org/apache/qpid/management/ (props changed)
qpid/branches/qmf-devel0.7a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ConfigurationManagement.java (props changed)
qpid/branches/qmf-devel0.7a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/LoggingManagement.java (props changed)
qpid/branches/qmf-devel0.7a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java (props changed)
qpid/branches/qmf-devel0.7a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java (props changed)
qpid/branches/qmf-devel0.7a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java (props changed)
qpid/branches/qmf-devel0.7a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java (props changed)
qpid/branches/qmf-devel0.7a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/UserManagement.java (props changed)
qpid/branches/qmf-devel0.7a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanAttribute.java (props changed)
qpid/branches/qmf-devel0.7a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanConstructor.java (props changed)
qpid/branches/qmf-devel0.7a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanDescription.java (props changed)
qpid/branches/qmf-devel0.7a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperation.java (props changed)
qpid/branches/qmf-devel0.7a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperationParameter.java (props changed)
qpid/branches/qmf-devel0.7a/qpid/java/management/eclipse-plugin/src/ (props changed)
qpid/branches/qmf-devel0.7a/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ConnectionTypeTabControl.java (props changed)
qpid/branches/qmf-devel0.7a/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ExchangeTypeTabControl.java (props changed)
qpid/branches/qmf-devel0.7a/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/MBeanTypeTabControl.java (props changed)
qpid/branches/qmf-devel0.7a/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/QueueTypeTabControl.java (props changed)
qpid/branches/qmf-devel0.7a/qpid/java/management/eclipse-plugin/src/main/resources/macosx/Contents/MacOS/qpidmc (props changed)
qpid/branches/qmf-devel0.7a/qpid/java/systests/src/main/java/org/apache/qpid/client/MultipleJCAProviderRegistrationTest.java (props changed)
qpid/branches/qmf-devel0.7a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSDestinationTest.java
qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/08StandaloneExcludes (props changed)
qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/CPPExcludes (props changed)
qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/Excludes (props changed)
qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/JavaExcludes (props changed)
qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/JavaStandaloneExcludes (props changed)
qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/JavaTransientExcludes (props changed)
qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/XAExcludes (props changed)
qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/clean-dir (props changed)
qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/cpp.async.testprofile (props changed)
qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/cpp.cluster.testprofile (props changed)
qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/cpp.noprefetch.testprofile (props changed)
qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/cpp.ssl.excludes (props changed)
qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/cpp.ssl.testprofile (props changed)
qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/cpp.testprofile (props changed)
qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/default.testprofile (props changed)
qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/java-derby.testprofile (props changed)
qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/java.testprofile (props changed)
qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/log4j-test.xml (props changed)
qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/test-provider.properties (props changed)
qpid/branches/qmf-devel0.7a/qpid/java/test-profiles/test_resources/ (props changed)
qpid/branches/qmf-devel0.7a/qpid/python/ (props changed)
qpid/branches/qmf-devel0.7a/qpid/python/examples/api/spout (props changed)
qpid/branches/qmf-devel0.7a/qpid/python/qpid/brokertest.py
qpid/branches/qmf-devel0.7a/qpid/python/qpid/compat.py
qpid/branches/qmf-devel0.7a/qpid/python/qpid/concurrency.py (contents, props changed)
qpid/branches/qmf-devel0.7a/qpid/python/qpid/datatypes.py
qpid/branches/qmf-devel0.7a/qpid/python/qpid/messaging/driver.py
qpid/branches/qmf-devel0.7a/qpid/python/qpid/messaging/endpoints.py
qpid/branches/qmf-devel0.7a/qpid/python/qpid/tests/messaging/endpoints.py
qpid/branches/qmf-devel0.7a/qpid/ruby/ext/sasl/extconf.rb (props changed)
qpid/branches/qmf-devel0.7a/qpid/tests/src/py/qpid_tests/broker_0_9/queue.py (props changed)
qpid/branches/qmf-devel0.7a/qpid/wcf/samples/Channel/WCFToWCFDirect/Client/Client.csproj
qpid/branches/qmf-devel0.7a/qpid/wcf/samples/Channel/WCFToWCFDirect/Service/Service.csproj
qpid/branches/qmf-devel0.7a/qpid/wcf/samples/Channel/WCFToWCFPubSub/Another_Topic_Consumer/Another_Topic_Consumer.csproj
qpid/branches/qmf-devel0.7a/qpid/wcf/samples/Channel/WCFToWCFPubSub/Topic_Consumer/Topic_Consumer.csproj
qpid/branches/qmf-devel0.7a/qpid/wcf/samples/Channel/WCFToWCFPubSub/Topic_Producer/Topic_Producer.csproj
Propchange: qpid/branches/qmf-devel0.7a/qpid/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Mar 27 02:26:12 2010
@@ -1,3 +1,5 @@
/qpid/branches/0.5.x-dev/qpid:892761,894875
+/qpid/branches/0.6-release-windows-installer:926803
+/qpid/branches/0.6-release-windows-installer/qpid:926803,927233
/qpid/branches/java-network-refactor/qpid:805429-825319
-/qpid/trunk/qpid:919043-926753
+/qpid/trunk/qpid:919043-928107
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/CMakeLists.txt?rev=928123&r1=928122&r2=928123&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/CMakeLists.txt (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/CMakeLists.txt Sat Mar 27 02:26:12 2010
@@ -50,9 +50,8 @@ set (QPIDC_CONF_FILE ${QPID_INSTALL_CONF
set (QPIDD_CONF_FILE ${QPID_INSTALL_CONFDIR}/qpidd.conf CACHE STRING
"Name of the Qpid broker configuration file")
-install(FILES LICENSE NOTICE README SSL RELEASE_NOTES DESIGN
- xml/cluster.xml INSTALL-WINDOWS
- DESTINATION ${QPID_INSTALL_DATADIR})
+install(FILES LICENSE NOTICE DESTINATION ${CMAKE_INSTALL_PREFIX})
+install(FILES xml/cluster.xml DESTINATION ${QPID_INSTALL_DATADIR})
if (WIN32)
set (CMAKE_DEBUG_POSTFIX "d")
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/examples/messaging/readme.txt
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/examples/messaging/readme.txt?rev=928123&r1=928122&r2=928123&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/examples/messaging/readme.txt (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/examples/messaging/readme.txt Sat Mar 27 02:26:12 2010
@@ -96,7 +96,7 @@ of address (as there is no existing enti
type and as we do not want the default type to be created, namely a
queue):
-* run: ./drain -f --address 'my-new-topic; {create: always, node-properties:{type:topic}}'
+* run: ./drain -f --address 'my-new-topic; {create: always, node:{type:topic}}'
* then run: ./spout --address my-new-queue
The value to the create policy is one of always, sender, receiver or
@@ -128,19 +128,17 @@ qpid-config or even auto-create one):
An example using xquery based filtering with the xml exchange:
* First start a subscriber with an xquery filter specified:
- ./drain -f --address 'xml/my-subject; {filter:{xquery:"declare variable $colour external; $colour = '\''red'\''"}}'
+ ./drain -f --address 'xml; {link:{x-bindings:[{arguments:{xquery:"declare variable $colour external; $colour = '\''red'\''"}}]}}'
* Then test receipt of messages that match the xquery filter:
- ./spout --address 'xml/my-subject' --property colour=red --content 'matched!'
+ ./spout --address 'xml' --property colour=red --content 'matched!'
and
- ./spout --address 'xml/my-subject' --property colour=blue --content 'not matched'
+ ./spout --address 'xml' --property colour=blue --content 'not matched'
TODO:
* auto-creating exchanges of different types
-* xml content in the xquery example
-
* 'durable' and 'reliable' subscriptions
* map content
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/examples/tradedemo/tradedemo_topic_publisher.vcproj
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/examples/tradedemo/tradedemo_topic_publisher.vcproj?rev=928123&r1=928122&r2=928123&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/examples/tradedemo/tradedemo_topic_publisher.vcproj (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/examples/tradedemo/tradedemo_topic_publisher.vcproj Sat Mar 27 02:26:12 2010
@@ -72,7 +72,7 @@
Name="VCCLCompilerTool"
Optimization="0"
AdditionalIncludeDirectories="$(BOOST_ROOT)\include\$(BOOST_VERSION),$(BOOST_ROOT)\.,$(QPID_ROOT)\include,..\..\include"
- PreprocessorDefinitions="_DEBUG;WIN32;_CONSOLE;_CRT_NONSTDC_NO_WARNINGS;NOMINMAX;WIN32_LEAN_AND_MEAN;_SCL_SECURE_NO_WARNINGS"
+ PreprocessorDefinitions="_DEBUG;WIN32;_CONSOLE;_CRT_NONSTDC_NO_WARNINGS;NOMINMAX;WIN32_LEAN_AND_MEAN;_SCL_SECURE_NO_WARNINGS;BOOST_ALL_DYN_LINK"
MinimalRebuild="false"
BasicRuntimeChecks="3"
RuntimeLibrary="3"
@@ -154,7 +154,7 @@
Name="VCCLCompilerTool"
Optimization="2"
AdditionalIncludeDirectories="$(BOOST_ROOT)\include\$(BOOST_VERSION),$(BOOST_ROOT)\.,$(QPID_ROOT)\include,..\..\include"
- PreprocessorDefinitions="NDEBUG;WIN32;_CONSOLE;_CRT_NONSTDC_NO_WARNINGS;NOMINMAX;WIN32_LEAN_AND_MEAN;_SCL_SECURE_NO_WARNINGS"
+ PreprocessorDefinitions="NDEBUG;WIN32;_CONSOLE;_CRT_NONSTDC_NO_WARNINGS;NOMINMAX;WIN32_LEAN_AND_MEAN;_SCL_SECURE_NO_WARNINGS;BOOST_ALL_DYN_LINK"
RuntimeLibrary="2"
RuntimeTypeInfo="true"
WarningLevel="3"
@@ -235,7 +235,7 @@
Name="VCCLCompilerTool"
Optimization="0"
AdditionalIncludeDirectories="$(BOOST_ROOT)\include\$(BOOST_VERSION),$(BOOST_ROOT)\.,$(QPID_ROOT)\include,..\..\include"
- PreprocessorDefinitions="_DEBUG;WIN32;_CONSOLE;_CRT_NONSTDC_NO_WARNINGS;_AMD64_;_WIN64;NOMINMAX;WIN32_LEAN_AND_MEAN;_SCL_SECURE_NO_WARNINGS"
+ PreprocessorDefinitions="_DEBUG;WIN32;_CONSOLE;_CRT_NONSTDC_NO_WARNINGS;_AMD64_;_WIN64;NOMINMAX;WIN32_LEAN_AND_MEAN;_SCL_SECURE_NO_WARNINGS;BOOST_ALL_DYN_LINK"
MinimalRebuild="false"
BasicRuntimeChecks="3"
RuntimeLibrary="3"
@@ -318,7 +318,7 @@
Name="VCCLCompilerTool"
Optimization="2"
AdditionalIncludeDirectories="$(BOOST_ROOT)\include\$(BOOST_VERSION),$(BOOST_ROOT)\.,$(QPID_ROOT)\include,..\..\include"
- PreprocessorDefinitions="NDEBUG;WIN32;_CONSOLE;_CRT_NONSTDC_NO_WARNINGS;_AMD64_;_WIN64;NOMINMAX;WIN32_LEAN_AND_MEAN;_SCL_SECURE_NO_WARNINGS"
+ PreprocessorDefinitions="NDEBUG;WIN32;_CONSOLE;_CRT_NONSTDC_NO_WARNINGS;_AMD64_;_WIN64;NOMINMAX;WIN32_LEAN_AND_MEAN;_SCL_SECURE_NO_WARNINGS;BOOST_ALL_DYN_LINK"
RuntimeLibrary="2"
RuntimeTypeInfo="true"
WarningLevel="3"
Propchange: qpid/branches/qmf-devel0.7a/qpid/cpp/examples/tradedemo/tradedemo_topic_publisher.vcproj
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Sat Mar 27 02:26:12 2010
@@ -0,0 +1,5 @@
+/qpid/branches/0.5.x-dev/qpid/cpp/examples/tradedemo/tradedemo_topic_publisher.vcproj:892761,894875
+/qpid/branches/0.6-release-windows-installer/cpp/examples/tradedemo/tradedemo_topic_publisher.vcproj:926803
+/qpid/branches/0.6-release-windows-installer/qpid/cpp/examples/tradedemo/tradedemo_topic_publisher.vcproj:926803,927218,927233
+/qpid/branches/java-network-refactor/qpid/cpp/examples/tradedemo/tradedemo_topic_publisher.vcproj:805429-825319
+/qpid/trunk/qpid/cpp/examples/tradedemo/tradedemo_topic_publisher.vcproj:919043-928107
Propchange: qpid/branches/qmf-devel0.7a/qpid/cpp/include/qmf/engine/Agent.h
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Mar 27 02:26:12 2010
@@ -1 +1 @@
-/qpid/trunk/qpid/cpp/include/qmf/engine/Agent.h:919043-926753
+/qpid/trunk/qpid/cpp/include/qmf/engine/Agent.h:919043-928107
Propchange: qpid/branches/qmf-devel0.7a/qpid/cpp/include/qmf/engine/Console.h
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Mar 27 02:26:12 2010
@@ -1 +1 @@
-/qpid/trunk/qpid/cpp/include/qmf/engine/Console.h:919043-926753
+/qpid/trunk/qpid/cpp/include/qmf/engine/Console.h:919043-928107
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/include/qpid/agent/ManagementAgent.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/include/qpid/agent/ManagementAgent.h?rev=928123&r1=928122&r2=928123&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/include/qpid/agent/ManagementAgent.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/include/qpid/agent/ManagementAgent.h Sat Mar 27 02:26:12 2010
@@ -24,7 +24,6 @@
#include "qpid/management/ManagementObject.h"
#include "qpid/management/ManagementEvent.h"
#include "qpid/management/Manageable.h"
-#include "qpid/sys/Mutex.h"
#include "qpid/client/ConnectionSettings.h"
namespace qpid {
@@ -45,11 +44,6 @@ class ManagementAgent
QMF_AGENT_EXTERN Singleton(bool disableManagement = false);
QMF_AGENT_EXTERN ~Singleton();
QMF_AGENT_EXTERN static ManagementAgent* getInstance();
- private:
- static sys::Mutex lock;
- static bool disabled;
- static int refCount;
- static ManagementAgent* agent;
};
typedef enum {
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/include/qpid/messaging/Address.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/include/qpid/messaging/Address.h?rev=928123&r1=928122&r2=928123&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/include/qpid/messaging/Address.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/include/qpid/messaging/Address.h Sat Mar 27 02:26:12 2010
@@ -65,82 +65,57 @@ class AddressImpl;
*
* <table border=0>
*
- * <tr valign=top><td>create</td><td>Indicate whether the address should be
- * automatically created or not. Can be one of <i>always</i>,
- * <i>never</i>, <i>sender</i> or <i>receiver</i>. The properties of
- * the node to be created can be specified via the node-properties
- * option (see below).</td></tr>
- *
- * <tr valign=top><td>assert</td><td>Indicate whether or not to assert any specified
- * node-properties match the address. Can be one of <i>always</i>,
- * <i>never</i>, <i>sender</i> or <i>receiver</i>.</td></tr>
- *
- * <tr valign=top><td>delete</td><td>Indicate whether or not to delete the addressed
- * nide when a sender or receiver is cancelled. Can be one of <i>always</i>,
- * <i>never</i>, <i>sender</i> or <i>receiver</i>.</td></tr>
+ * <tr valign=top>
+ * <td>create</td>
+ * <td>Indicate whether the address should be automatically created
+ * or not. Can be one of <i>always</i>, <i>never</i>,
+ * <i>sender</i> or <i>receiver</i>. The properties of the node
+ * to be created can be specified via the node options (see
+ * below).
+ * </td>
+ * </tr>
+ *
+ * <tr valign=top>
+ * <td>assert</td>
+ * <td>Indicate whether or not to assert any specified node
+ * properties(see below) match the address. Can be one of
+ * <i>always</i>, <i>never</i>, <i>sender</i> or
+ * <i>receiver</i>.
+ * </td>
+ * </tr>
+ *
+ * <tr valign=top>
+ * <td>delete</td>
+ * <td>Indicate whether or not to delete the addressed node when a
+ * sender or receiver is cancelled. Can be one of <i>always</i>,
+ * <i>never</i>, <i>sender</i> or <i>receiver</i>.
+ * </td>
+ * </tr>
*
- * <tr valign=top><td>reliability</td><td>indicates the level of
- * reliability expected. Can be one of unreliable, at-most-once,
- * at-least-once or exactly-once (the latter is not yet correctly
- * supported).</td></tr>
- *
- * <tr valign=top><td>node-properties</td><td>A nested map of properties of the addressed
- * entity or 'node'. These can be used when automatically creating it,
- * or to assert certain properties.
- *
- * The valid node-properties are:
- * <ul>
- * <li>type - queue or topic</li>
- *
- * <li>durable - true or false</li>
- *
- * <li>x-properties - a nested map that can contain implementation or
- * protocol specifiec extedned properties. For the amqp 0-10 mapping,
- * the fields in queue- or exchange- declare can be specified in here;
- * a bindings entry may also be specified, whose value should be an
- * array of strings of the form exchange/key; anything else will be
- * passed through in the arguments field.
- * </li>
- * </ul>
- * </td></tr>
- *
- * </table>
+ * <tr valign=top>
+ * <td>node</td>
+ * <td>A nested map describing properties of the addressed
+ * node. Current properties supported are type (topic or queue),
+ * durable (boolean), x-declare and x-bindings.
+ * </td>
+ * </tr>
+ *
+ * <tr valign=top>
+ * <td>link</td>
+ * <td>A nested map through which properties of the 'link' from
+ * sender/receiver to node can be configured. Current propeties
+ * are name, durable, realiability, x-declare, x-subscribe and
+ * x-bindings.
+ * </td>
+ * </tr>
*
- * For receivers there are some further options of interest:
+ * For receivers there is one other option of interest:
*
* <table border=0 valign=top>
- *
- * <tr valign=top><td>no-local</td><td>(only relevant for topics at present) specifies that the
- * receiver does not want to receiver messages published to the topic
- * that originate from a sender on the same connection</td></tr>
- *
* <tr valign=top><td>mode</td><td>(only relevant for queues)
* indicates whether the subscribe should consume (the default) or
* merely browse the messages. Valid values are 'consume' and
* 'browse'</td></tr>
- *
- * <tr valign=top><td>durable</td><td>(only relevant for topics at present) specifies that a
- * durable subscription is required</td></tr>
- *
- * <tr valign=top><td>filter</td><td>(only relevant for topics at present) allows bindings to
- * be created for the queue that match the given criteria (or list of
- * criteria).</td></tr>
- *
- * <tr valign=top><td>x-properties</td><td>allows protocol or implementation specific options
- * to be specified for a receiver; this is a nested map and currently
- * the implementation only recognises two specific nested properties
- * within it (all others are passed through in the arguments of the
- * message-subscribe command):
- *
- * <ul>
- * <li>exclusive, which requests an exclusive subscription and
- * is only relevant for queues</li>
- *
- * <li>x-queue-arguments, which is only relevant for topics and
- * allows arguments to the queue-declare for the subscription
- * queue to be specified</li>
- * </ul>
- * </td></tr>
* </table>
*/
class Address
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/CMakeLists.txt?rev=928123&r1=928122&r2=928123&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/CMakeLists.txt (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/CMakeLists.txt Sat Mar 27 02:26:12 2010
@@ -246,14 +246,47 @@ if (MSVC)
${_boost_regex_debug} ${_boost_regex_release}
${_boost_system_debug} ${_boost_system_release}
${_boost_thread_debug} ${_boost_thread_release}
- DESTINATION ${QPID_INSTALL_LIBDIR}
+ DESTINATION ${QPID_INSTALL_LIBDIR}/boost
COMPONENT ${QPID_COMPONENT_COMMON})
endif (QPID_LINK_BOOST_DYNAMIC)
- # Need the boost headers regardless of which way the libs go.
+ # Need the boost headers regardless of which way the libs go. Try to
+ # weed out what we don't need, else it's giant and unnecessary.
install (DIRECTORY ${Boost_INCLUDE_DIR}/boost
DESTINATION ${QPID_INSTALL_INCLUDEDIR}
- COMPONENT ${QPID_COMPONENT_CLIENT_INCLUDE})
+ COMPONENT ${QPID_COMPONENT_CLIENT_INCLUDE}
+ PATTERN "accumulators/*" EXCLUDE
+ PATTERN "algorithm/*" EXCLUDE
+ PATTERN "archive/*" EXCLUDE
+ PATTERN "asio*" EXCLUDE
+ PATTERN "bimap*" EXCLUDE
+ PATTERN "circular_buffer*" EXCLUDE
+ PATTERN "concept*" EXCLUDE
+ PATTERN "dynamic_bitset*" EXCLUDE
+ PATTERN "flyweight*" EXCLUDE
+ PATTERN "fusion*" EXCLUDE
+ PATTERN "gil*" EXCLUDE
+ PATTERN "graph*" EXCLUDE
+ PATTERN "interprocess*" EXCLUDE
+ PATTERN "lambda/*" EXCLUDE
+ PATTERN "logic*" EXCLUDE
+ PATTERN "math*" EXCLUDE
+ PATTERN "mpi*" EXCLUDE
+ PATTERN "multi_*" EXCLUDE
+ PATTERN "numeric*" EXCLUDE
+ PATTERN "pending*" EXCLUDE
+ PATTERN "pool*" EXCLUDE
+ PATTERN "property_map*" EXCLUDE
+ PATTERN "proto*" EXCLUDE
+ PATTERN "random*" EXCLUDE
+ PATTERN "range*" EXCLUDE
+ PATTERN "signals*" EXCLUDE
+ PATTERN "spirit*" EXCLUDE
+ PATTERN "statechart*" EXCLUDE
+ PATTERN "units*" EXCLUDE
+ PATTERN "unordered*" EXCLUDE
+ PATTERN "wave*" EXCLUDE
+ PATTERN "xpressive*" EXCLUDE)
set(Boost_DATE_TIME_LIBRARY "")
set(Boost_THREAD_LIBRARY "")
@@ -590,6 +623,19 @@ install (TARGETS qpidcommon
DESTINATION ${QPID_INSTALL_LIBDIR}
COMPONENT ${QPID_COMPONENT_COMMON})
+if (WIN32)
+ # Need the .pdb file, which isn't installed with the .dll/.lib
+ # Not built... if needed, add the build option then uncomment this.
+ #get_target_property(qpidcommon_dll qpidcommon LOCATION)
+ #string(REPLACE .dll .pdb qpidcommon_pdb ${qpidcommon_dll})
+ #string(REPLACE $(OutDir) \${CMAKE_INSTALL_CONFIG_NAME} qpidcommon_pdb ${qpidcommon_pdb})
+ #message(STATUS "_pdb: ${qpidcommon_pdb}")
+ #install (PROGRAMS
+ # ${qpidcommon_pdb}
+ # DESTINATION ${QPID_INSTALL_LIBDIR}
+ # COMPONENT ${QPID_COMPONENT_CLIENT})
+endif (WIN32)
+
set (qpidclient_SOURCES
${rgen_client_srcs}
${qpidclient_platform_SOURCES}
@@ -681,6 +727,7 @@ install (DIRECTORY ../include/qpid
DESTINATION ${QPID_INSTALL_INCLUDEDIR}
COMPONENT ${QPID_COMPONENT_CLIENT_INCLUDE}
PATTERN ".svn" EXCLUDE)
+
# Released source artifacts from Apache have the generated headers included in
# the source tree, not the binary tree. So don't attempt to grab them when
# they're not supposed to be there.
@@ -691,6 +738,18 @@ if (NOT QPID_GENERATED_HEADERS_IN_SOURCE
endif (NOT QPID_GENERATED_HEADERS_IN_SOURCE)
if (WIN32)
+ # Need the .pdb file, which isn't installed with the .dll/.lib
+ #get_target_property(qpidclient_dll qpidclient LOCATION)
+ #string(REPLACE .dll .pdb qpidclient_pdb ${qpidclient_dll})
+ #string(REPLACE $(OutDir) \${CMAKE_INSTALL_CONFIG_NAME} qpidclient_pdb ${qpidclient_pdb})
+ #message(STATUS "_pdb: ${qpidclient_pdb}")
+ #install (PROGRAMS
+ # ${qpidclient_pdb}
+ # DESTINATION ${QPID_INSTALL_LIBDIR}
+ # COMPONENT ${QPID_COMPONENT_CLIENT})
+endif (WIN32)
+
+if (WIN32)
set(AMQP_WCF_DIR ${qpid-cpp_SOURCE_DIR}/../wcf)
set(DTC_PLUGIN_SOURCE ${AMQP_WCF_DIR}/src/Apache/Qpid/DtcPlugin/DtcPlugin.cpp)
if (EXISTS ${DTC_PLUGIN_SOURCE})
@@ -905,6 +964,17 @@ set_target_properties (qmfconsole PROPER
install (TARGETS qmfconsole
DESTINATION ${QPID_INSTALL_LIBDIR}
COMPONENT ${QPID_COMPONENT_QMF})
+if (WIN32)
+ # Need the .pdb file, which isn't installed with the .dll/.lib
+ #get_target_property(qmfconsole_dll qmfconsole LOCATION)
+ #string(REPLACE .dll .pdb qmfconsole_pdb ${qmfconsole_dll})
+ #string(REPLACE $(OutDir) \${CMAKE_INSTALL_CONFIG_NAME} qmfconsole_pdb ${qmfconsole_pdb})
+ #message(STATUS "_pdb: ${qmfconsole_pdb}")
+ #install (PROGRAMS
+ # ${qmfconsole_pdb}
+ # DESTINATION ${QPID_INSTALL_LIBDIR}
+ # COMPONENT ${QPID_COMPONENT_QMF})
+endif (WIN32)
# A queue event listener plugin that creates messages on a replication
# queue corresponding to enqueue and dequeue events:
Propchange: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qmf/engine/Agent.cpp
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Mar 27 02:26:12 2010
@@ -1 +1 @@
-/qpid/trunk/qpid/cpp/src/qmf/engine/Agent.cpp:919043-926753
+/qpid/trunk/qpid/cpp/src/qmf/engine/Agent.cpp:919043-928107
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp?rev=928123&r1=928122&r2=928123&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp Sat Mar 27 02:26:12 2010
@@ -46,10 +46,12 @@ using std::cout;
using std::endl;
using qpid::messaging::Variant;
-Mutex ManagementAgent::Singleton::lock;
-bool ManagementAgent::Singleton::disabled = false;
-ManagementAgent* ManagementAgent::Singleton::agent = 0;
-int ManagementAgent::Singleton::refCount = 0;
+namespace {
+ Mutex lock;
+ bool disabled = false;
+ ManagementAgent* agent = 0;
+ int refCount = 0;
+}
ManagementAgent::Singleton::Singleton(bool disableManagement)
{
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/Queue.cpp?rev=928123&r1=928122&r2=928123&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/Queue.cpp Sat Mar 27 02:26:12 2010
@@ -502,6 +502,7 @@ void Queue::purgeExpired()
if (lastValueQueue) checkLvqReplace(*i);
if (i->payload->hasExpired()) {
expired.push_back(*i);
+ clearLVQIndex(*i);
i = messages.erase(i);
} else {
++i;
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/SessionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/SessionImpl.cpp?rev=928123&r1=928122&r2=928123&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/SessionImpl.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/SessionImpl.cpp Sat Mar 27 02:26:12 2010
@@ -73,12 +73,12 @@ SessionImpl::~SessionImpl() {
{
Lock l(state);
if (state != DETACHED && state != DETACHING) {
- QPID_LOG(warning, "Session was not closed cleanly: " << id);
- try {
+ if (autoDetach) {
+ QPID_LOG(warning, "Session was not closed cleanly: " << id);
// Inform broker but don't wait for detached as that deadlocks.
// The detached will be ignored as the channel will be invalid.
- if (autoDetach) detach();
- } catch (...) {} // ignore errors.
+ try { detach(); } catch (...) {} // ignore errors.
+ }
setState(DETACHED);
handleClosed();
state.waitWaiters();
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp?rev=928123&r1=928122&r2=928123&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp Sat Mar 27 02:26:12 2010
@@ -61,40 +61,54 @@ using namespace boost::assign;
namespace{
const Variant EMPTY_VARIANT;
const FieldTable EMPTY_FIELD_TABLE;
+ const Variant::List EMPTY_LIST;
const std::string EMPTY_STRING;
-//option names
-const std::string BROWSE("browse");
-const std::string CONSUME("consume");
-const std::string EXCLUSIVE("exclusive");
-const std::string MODE("mode");
-const std::string NO_LOCAL("no-local");
-const std::string FILTER("filter");
-const std::string RELIABILITY("reliability");
-const std::string NAME("subscription-name");
-const std::string NODE_PROPERTIES("node-properties");
-const std::string X_PROPERTIES("x-properties");
-
//policy types
const std::string CREATE("create");
const std::string ASSERT("assert");
const std::string DELETE("delete");
+
+//option names
+const std::string NODE("node");
+const std::string LINK("link");
+const std::string MODE("mode");
+const std::string RELIABILITY("reliability");
+const std::string NAME("name");
+const std::string DURABLE("durable");
+const std::string X_DECLARE("x-declare");
+const std::string X_SUBSCRIBE("x-subscribe");
+const std::string X_BINDINGS("x-bindings");
+const std::string EXCHANGE("exchange");
+const std::string QUEUE("queue");
+const std::string KEY("key");
+const std::string ARGUMENTS("arguments");
+const std::string ALTERNATE_EXCHANGE("alternate-exchange");
+const std::string TYPE("type");
+const std::string EXCLUSIVE("exclusive");
+const std::string AUTO_DELETE("auto-delete");
+
//policy values
const std::string ALWAYS("always");
const std::string NEVER("never");
const std::string RECEIVER("receiver");
const std::string SENDER("sender");
+//address types
const std::string QUEUE_ADDRESS("queue");
const std::string TOPIC_ADDRESS("topic");
+//reliability options:
const std::string UNRELIABLE("unreliable");
const std::string AT_MOST_ONCE("at-most-once");
const std::string AT_LEAST_ONCE("at-least-once");
const std::string EXACTLY_ONCE("exactly-once");
-const std::string DURABLE_SUBSCRIPTION("durable");
-const std::string DURABLE("durable");
+//receiver modes:
+const std::string BROWSE("browse");
+const std::string CONSUME("consume");
+
+//0-10 exchange types:
const std::string TOPIC_EXCHANGE("topic");
const std::string FANOUT_EXCHANGE("fanout");
const std::string DIRECT_EXCHANGE("direct");
@@ -103,16 +117,26 @@ const std::string XML_EXCHANGE("xml");
const std::string WILDCARD_ANY("*");
}
-//some amqp 0-10 specific options
-namespace xamqp{
-const std::string AUTO_DELETE("auto-delete");
-const std::string EXCHANGE_TYPE("type");
-const std::string EXCLUSIVE("exclusive");
-const std::string ALTERNATE_EXCHANGE("alternate-exchange");
-const std::string QUEUE_ARGUMENTS("x-queue-arguments");
-const std::string SUBSCRIBE_ARGUMENTS("x-subscribe-arguments");
-const std::string BINDINGS("bindings");
-}
+struct Binding
+{
+ Binding(const Variant::Map&);
+ Binding(const std::string& exchange, const std::string& queue, const std::string& key);
+
+ std::string exchange;
+ std::string queue;
+ std::string key;
+ FieldTable arguments;
+};
+
+struct Bindings : std::vector<Binding>
+{
+ void add(const Variant::List& bindings);
+ void setDefaultExchange(const std::string&);
+ void setDefaultQueue(const std::string&);
+ void bind(qpid::client::AsyncSession& session);
+ void unbind(qpid::client::AsyncSession& session);
+ void check(qpid::client::AsyncSession& session);
+};
class Node
{
@@ -125,6 +149,8 @@ class Node
Variant createPolicy;
Variant assertPolicy;
Variant deletePolicy;
+ Bindings nodeBindings;
+ Bindings linkBindings;
static bool enabled(const Variant& policy, CheckMode mode);
static bool createEnabled(const Address& address, CheckMode mode);
@@ -133,17 +159,6 @@ class Node
static std::vector<std::string> SENDER_MODES;
};
-struct Binding
-{
- Binding(const std::string& exchange, const std::string& key, const FieldTable& options = EMPTY_FIELD_TABLE);
-
- std::string exchange;
- std::string key;
- FieldTable options;
-};
-
-typedef std::vector<Binding> Bindings;
-
class Queue : protected Node
{
@@ -154,16 +169,11 @@ class Queue : protected Node
void checkAssert(qpid::client::AsyncSession&, CheckMode);
void checkDelete(qpid::client::AsyncSession&, CheckMode);
private:
- bool durable;
- bool autoDelete;
- bool exclusive;
- std::string alternateExchange;
+ const bool durable;
+ const bool autoDelete;
+ const bool exclusive;
+ const std::string alternateExchange;
FieldTable arguments;
- Bindings bindings;
-
- void configure(const Address&);
- void addBindings(const Variant::List&);
- void addBinding(const std::string&);
};
class Exchange : protected Node
@@ -174,17 +184,14 @@ class Exchange : protected Node
void checkCreate(qpid::client::AsyncSession&, CheckMode);
void checkAssert(qpid::client::AsyncSession&, CheckMode);
void checkDelete(qpid::client::AsyncSession&, CheckMode);
- const std::string& getDesiredExchangeType() { return type; }
+ protected:
+ const std::string specifiedType;
private:
- std::string type;
- bool typeSpecified;
- bool durable;
- bool autoDelete;
- std::string alternateExchange;
- FieldTable arguments;
-
- void configure(const Address&);
+ const bool durable;
+ const bool autoDelete;
+ const std::string alternateExchange;
+ FieldTable arguments;
};
class QueueSource : public Queue, public MessageSource
@@ -203,24 +210,22 @@ class QueueSource : public Queue, public
class Subscription : public Exchange, public MessageSource
{
public:
- Subscription(const Address&, const std::string& exchangeType="");
+ Subscription(const Address&, const std::string& actualType);
void subscribe(qpid::client::AsyncSession& session, const std::string& destination);
void cancel(qpid::client::AsyncSession& session, const std::string& destination);
private:
const std::string queue;
const bool reliable;
const bool durable;
+ const std::string actualType;
FieldTable queueOptions;
FieldTable subscriptionOptions;
Bindings bindings;
- void bindSpecial(const std::string& exchangeType);
- void bind(const std::string& subject);
- void bind(const std::string& subject, const Variant& filter);
- void bind(const std::string& subject, const Variant::Map& filter);
- void bind(const std::string& subject, const Variant::List& filter);
- void add(const std::string& exchange, const std::string& key, const FieldTable& options = EMPTY_FIELD_TABLE);
- static std::string getSubscriptionName(const std::string& base, const Variant& name);
+ void bindSubject(const std::string& subject);
+ void bindAll();
+ void add(const std::string& exchange, const std::string& key);
+ static std::string getSubscriptionName(const std::string& base, const std::string& name);
};
class ExchangeSink : public Exchange, public MessageSink
@@ -267,14 +272,102 @@ bool getSenderPolicy(const Address& addr
return in(address.getOption(key), list_of<std::string>(ALWAYS)(SENDER));
}
+const Variant& getOption(const Variant::Map& options, const std::vector<std::string>& path, size_t index=0)
+{
+ Variant::Map::const_iterator j = options.find(path[index]);
+ if (j == options.end()) {
+ return EMPTY_VARIANT;
+ } else if (++index < path.size()) {
+ if (j->second.getType() != qpid::messaging::VAR_MAP)
+ throw InvalidAddress((boost::format("Expected %1% to be a map") % j->first).str());
+ return getOption(j->second.asMap(), path, index);
+ } else {
+ return j->second;
+ }
+}
+
+const Variant& getOption(const Address& address, const std::vector<std::string>& path)
+{
+ return getOption(address.getOptions(), path);
+}
+
+const Variant& getOption(const Variant::Map& options, const std::string& name)
+{
+ Variant::Map::const_iterator j = options.find(name);
+ if (j == options.end()) {
+ return EMPTY_VARIANT;
+ } else {
+ return j->second;
+ }
+}
+
+struct Opt
+{
+ Opt(const Address& address);
+ Opt(const Variant::Map& base);
+ Opt& operator/(const std::string& name);
+ operator bool() const;
+ std::string str() const;
+ const Variant::List& asList() const;
+ void collect(qpid::framing::FieldTable& args) const;
+
+ const Variant::Map* options;
+ const Variant* value;
+};
+
+Opt::Opt(const Address& address) : options(&(address.getOptions())), value(0) {}
+Opt::Opt(const Variant::Map& base) : options(&base), value(0) {}
+Opt& Opt::operator/(const std::string& name)
+{
+ if (options) {
+ Variant::Map::const_iterator j = options->find(name);
+ if (j == options->end()) {
+ value = 0;
+ options = 0;
+ } else {
+ value = &(j->second);
+ if (value->getType() == qpid::messaging::VAR_MAP) options = &(value->asMap());
+ else options = 0;
+ }
+ }
+ return *this;
+}
+
+
+Opt::operator bool() const
+{
+ return value && !value->isVoid() && value->asBool();
+}
+
+std::string Opt::str() const
+{
+ if (value) return value->asString();
+ else return EMPTY_STRING;
+}
+
+const Variant::List& Opt::asList() const
+{
+ if (value) return value->asList();
+ else return EMPTY_LIST;
+}
+
+void Opt::collect(qpid::framing::FieldTable& args) const
+{
+ if (value) {
+ translate(value->asMap(), args);
+ }
+}
+
bool AddressResolution::is_unreliable(const Address& address)
{
- return in(address.getOption(RELIABILITY), list_of<std::string>(UNRELIABLE)(AT_MOST_ONCE));
+ return in(getOption(address, list_of<std::string>(LINK)(RELIABILITY)),
+ list_of<std::string>(UNRELIABLE)(AT_MOST_ONCE));
}
bool AddressResolution::is_reliable(const Address& address)
{
- return in(address.getOption(RELIABILITY), list_of<std::string>(AT_LEAST_ONCE)(EXACTLY_ONCE));
+ return in(getOption(address, list_of<std::string>(LINK)(RELIABILITY)),
+ list_of<std::string>(AT_LEAST_ONCE)(EXACTLY_ONCE));
}
std::string checkAddressType(qpid::client::Session session, const Address& address)
@@ -282,7 +375,7 @@ std::string checkAddressType(qpid::clien
if (address.getName().empty()) {
throw InvalidAddress("Name cannot be null");
}
- std::string type = address.getType();
+ std::string type = (Opt(address)/NODE/TYPE).str();
if (type.empty()) {
ExchangeBoundResult result = session.exchangeBound(arg::exchange=address.getName(), arg::queue=address.getName());
if (result.getQueueNotFound() && result.getExchangeNotFound()) {
@@ -307,7 +400,8 @@ std::auto_ptr<MessageSource> AddressReso
{
std::string type = checkAddressType(session, address);
if (type == TOPIC_ADDRESS) {
- std::auto_ptr<MessageSource> source(new Subscription(address));
+ std::string exchangeType = sync(session).exchangeQuery(address.getName()).getType();
+ std::auto_ptr<MessageSource> source(new Subscription(address, exchangeType));
QPID_LOG(debug, "treating source address as topic: " << address);
return source;
} else if (type == QUEUE_ADDRESS) {
@@ -337,18 +431,6 @@ std::auto_ptr<MessageSink> AddressResolu
}
}
-const Variant& getNestedOption(const Variant::Map& options, const std::vector<std::string>& keys, size_t index = 0)
-{
- Variant::Map::const_iterator i = options.find(keys[index]);
- if (i == options.end()) {
- return EMPTY_VARIANT;
- } else if (index+1 < keys.size()) {
- return getNestedOption(i->second.asMap(), keys, index+1);
- } else {
- return i->second;
- }
-}
-
bool isBrowse(const Address& address)
{
const Variant& mode = address.getOption(MODE);
@@ -366,23 +448,18 @@ QueueSource::QueueSource(const Address&
acquireMode(isBrowse(address) ? ACQUIRE_MODE_NOT_ACQUIRED : ACQUIRE_MODE_PRE_ACQUIRED),
exclusive(false)
{
- //extract subscription arguments from address options
- const Variant& x = address.getOption(X_PROPERTIES);
- if (!x.isVoid()) {
- const Variant::Map& xProps = x.asMap();
- Variant::Map passthrough;
- for (Variant::Map::const_iterator i = xProps.begin(); i != xProps.end(); ++i) {
- if (i->first == xamqp::EXCLUSIVE) exclusive = i->second;
- else passthrough[i->first] = i->second;
- }
- translate(passthrough, options);
- }
+ //extract subscription arguments from address options (nb: setting
+ //of accept-mode/acquire-mode/destination controlled though other
+ //options)
+ exclusive = Opt(address)/NODE/LINK/X_SUBSCRIBE/EXCLUSIVE;
+ (Opt(address)/NODE/LINK/X_SUBSCRIBE/ARGUMENTS).collect(options);
}
void QueueSource::subscribe(qpid::client::AsyncSession& session, const std::string& destination)
{
checkCreate(session, FOR_RECEIVER);
checkAssert(session, FOR_RECEIVER);
+ linkBindings.bind(session);
session.messageSubscribe(arg::queue=name,
arg::destination=destination,
arg::acceptMode=acceptMode,
@@ -393,58 +470,72 @@ void QueueSource::subscribe(qpid::client
void QueueSource::cancel(qpid::client::AsyncSession& session, const std::string& destination)
{
+ linkBindings.unbind(session);
session.messageCancel(destination);
checkDelete(session, FOR_RECEIVER);
}
-std::string Subscription::getSubscriptionName(const std::string& base, const Variant& name)
+std::string Subscription::getSubscriptionName(const std::string& base, const std::string& name)
{
- if (name.isVoid()) {
+ if (name.empty()) {
return (boost::format("%1%_%2%") % base % Uuid(true).str()).str();
} else {
- return (boost::format("%1%_%2%") % base % name.asString()).str();
+ return (boost::format("%1%_%2%") % base % name).str();
}
}
-Subscription::Subscription(const Address& address, const std::string& exchangeType)
+Subscription::Subscription(const Address& address, const std::string& type)
: Exchange(address),
- queue(getSubscriptionName(name, address.getOption(NAME))),
+ queue(getSubscriptionName(name, (Opt(address)/LINK/NAME).str())),
reliable(AddressResolution::is_reliable(address)),
- durable(address.getOption(DURABLE_SUBSCRIPTION).asBool())
+ durable(Opt(address)/LINK/DURABLE),
+ actualType(type.empty() ? (specifiedType.empty() ? TOPIC_EXCHANGE : specifiedType) : type)
{
- if (address.getOption(NO_LOCAL).asBool()) queueOptions.setInt(NO_LOCAL, 1);
- const Variant& x = address.getOption(X_PROPERTIES);
- if (!x.isVoid()) {
- const Variant::Map& xProps = x.asMap();
- Variant::Map passthrough;
- for (Variant::Map::const_iterator i = xProps.begin(); i != xProps.end(); ++i) {
- if (i->first == xamqp::QUEUE_ARGUMENTS) convert(i->second.asMap(), queueOptions);
- else passthrough[i->first] = i->second;
- }
- translate(passthrough, subscriptionOptions);
- }
+ (Opt(address)/LINK/X_DECLARE/ARGUMENTS).collect(queueOptions);
+ (Opt(address)/LINK/X_SUBSCRIBE/ARGUMENTS).collect(subscriptionOptions);
- const Variant& filter = address.getOption(FILTER);
- if (!filter.isVoid()) {
- bind(address.getSubject(), filter);
- } else if (address.hasSubject()) {
- //Note: This will not work for headers- or xml- exchange;
- //fanout exchange will do no filtering.
- //TODO: for headers- or xml- exchange can construct a match
- //for the subject in the application-headers
- bind(address.getSubject());
+ if (!address.getSubject().empty()) bindSubject(address.getSubject());
+ else if (linkBindings.empty()) bindAll();
+}
+
+void Subscription::bindSubject(const std::string& subject)
+{
+ if (actualType == HEADERS_EXCHANGE) {
+ Binding b(name, queue, subject);
+ b.arguments.setString("qpid.subject", subject);
+ b.arguments.setString("x-match", "all");
+ bindings.push_back(b);
+ } else if (actualType == XML_EXCHANGE) {
+ Binding b(name, queue, subject);
+ std::string query = (boost::format("declare variable $qpid.subject external; $qpid.subject = '%1%'")
+ % subject).str();
+ b.arguments.setString("xquery", query);
+ bindings.push_back(b);
} else {
- //Neither a subject nor a filter has been defined, treat this
- //as wanting to match all messages (Note: direct exchange is
- //currently unable to support this case).
- if (!exchangeType.empty()) bindSpecial(exchangeType);
- else if (!getDesiredExchangeType().empty()) bindSpecial(getDesiredExchangeType());
+ //Note: the fanout exchange doesn't support any filtering, so
+ //the subject is ignored in that case
+ add(name, subject);
}
}
-void Subscription::add(const std::string& exchange, const std::string& key, const FieldTable& options)
+void Subscription::bindAll()
{
- bindings.push_back(Binding(exchange, key, options));
+ if (actualType == TOPIC_EXCHANGE) {
+ add(name, WILDCARD_ANY);
+ } else if (actualType == FANOUT_EXCHANGE) {
+ add(name, queue);
+ } else if (actualType == HEADERS_EXCHANGE) {
+ Binding b(name, queue, "match-all");
+ b.arguments.setString("x-match", "all");
+ bindings.push_back(b);
+ } else { //E.g. direct and xml
+ throw qpid::Exception(QPID_MSG("Cannot create binding to match all messages for exchange of type " << actualType));
+ }
+}
+
+void Subscription::add(const std::string& exchange, const std::string& key)
+{
+ bindings.push_back(Binding(exchange, queue, key));
}
void Subscription::subscribe(qpid::client::AsyncSession& session, const std::string& destination)
@@ -456,10 +547,11 @@ void Subscription::subscribe(qpid::clien
//create subscription queue:
session.queueDeclare(arg::queue=queue, arg::exclusive=true,
arg::autoDelete=!reliable, arg::durable=durable, arg::arguments=queueOptions);
- //bind subscription queue to exchange:
- for (Bindings::const_iterator i = bindings.begin(); i != bindings.end(); ++i) {
- session.exchangeBind(arg::queue=queue, arg::exchange=i->exchange, arg::bindingKey=i->key, arg::arguments=i->options);
- }
+ //'default' binding:
+ bindings.bind(session);
+ //any explicit bindings:
+ linkBindings.setDefaultQueue(queue);
+ linkBindings.bind(session);
//subscribe to subscription queue:
AcceptMode accept = reliable ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE;
session.messageSubscribe(arg::queue=queue, arg::destination=destination,
@@ -468,20 +560,19 @@ void Subscription::subscribe(qpid::clien
void Subscription::cancel(qpid::client::AsyncSession& session, const std::string& destination)
{
+ linkBindings.unbind(session);
session.messageCancel(destination);
session.queueDelete(arg::queue=queue);
checkDelete(session, FOR_RECEIVER);
}
-Binding::Binding(const std::string& e, const std::string& k, const FieldTable& o):
- exchange(e), key(k), options(o) {}
-
ExchangeSink::ExchangeSink(const Address& address) : Exchange(address) {}
void ExchangeSink::declare(qpid::client::AsyncSession& session, const std::string&)
{
checkCreate(session, FOR_SENDER);
checkAssert(session, FOR_SENDER);
+ linkBindings.bind(session);
}
void ExchangeSink::send(qpid::client::AsyncSession& session, const std::string&, OutgoingMessage& m)
@@ -492,6 +583,7 @@ void ExchangeSink::send(qpid::client::As
void ExchangeSink::cancel(qpid::client::AsyncSession& session, const std::string&)
{
+ linkBindings.unbind(session);
checkDelete(session, FOR_SENDER);
}
@@ -501,6 +593,7 @@ void QueueSink::declare(qpid::client::As
{
checkCreate(session, FOR_SENDER);
checkAssert(session, FOR_SENDER);
+ linkBindings.bind(session);
}
void QueueSink::send(qpid::client::AsyncSession& session, const std::string&, OutgoingMessage& m)
{
@@ -510,6 +603,7 @@ void QueueSink::send(qpid::client::Async
void QueueSink::cancel(qpid::client::AsyncSession& session, const std::string&)
{
+ linkBindings.unbind(session);
checkDelete(session, FOR_SENDER);
}
@@ -556,68 +650,24 @@ bool isTopic(qpid::client::Session sessi
}
}
-void Subscription::bind(const std::string& subject)
-{
- add(name, subject);
-}
-
-void Subscription::bind(const std::string& subject, const Variant& filter)
-{
- switch (filter.getType()) {
- case qpid::messaging::VAR_MAP:
- bind(subject, filter.asMap());
- break;
- case qpid::messaging::VAR_LIST:
- bind(subject, filter.asList());
- break;
- default:
- //TODO: if both subject _and_ filter are specified, combine in
- //some way; for now we just ignore the subject in that case.
- add(name, filter.asString());
- break;
- }
-}
-
-void Subscription::bind(const std::string& subject, const Variant::Map& filter)
-{
- qpid::framing::FieldTable arguments;
- translate(filter, arguments);
- add(name, subject.empty() ? queue : subject, arguments);
-}
-
-void Subscription::bind(const std::string& subject, const Variant::List& filter)
-{
- for (Variant::List::const_iterator i = filter.begin(); i != filter.end(); ++i) {
- bind(subject, *i);
- }
-}
-
-void Subscription::bindSpecial(const std::string& exchangeType)
-{
- if (exchangeType == TOPIC_EXCHANGE) {
- add(name, WILDCARD_ANY);
- } else if (exchangeType == FANOUT_EXCHANGE) {
- add(name, queue);
- } else if (exchangeType == HEADERS_EXCHANGE) {
- //TODO: add special binding for headers exchange to match all messages
- } else if (exchangeType == XML_EXCHANGE) {
- //TODO: add special binding for xml exchange to match all messages
- } else { //E.g. direct
- throw qpid::Exception(QPID_MSG("Cannot create binding to match all messages for exchange of type " << exchangeType));
- }
-}
-
Node::Node(const Address& address) : name(address.getName()),
createPolicy(address.getOption(CREATE)),
assertPolicy(address.getOption(ASSERT)),
- deletePolicy(address.getOption(DELETE)) {}
+ deletePolicy(address.getOption(DELETE))
+{
+ nodeBindings.add((Opt(address)/NODE/X_BINDINGS).asList());
+ linkBindings.add((Opt(address)/LINK/X_BINDINGS).asList());
+}
Queue::Queue(const Address& a) : Node(a),
- durable(false),
- autoDelete(false),
- exclusive(false)
-{
- configure(a);
+ durable(Opt(a)/NODE/DURABLE),
+ autoDelete(Opt(a)/NODE/X_DECLARE/AUTO_DELETE),
+ exclusive(Opt(a)/NODE/X_DECLARE/EXCLUSIVE),
+ alternateExchange((Opt(a)/NODE/X_DECLARE/ALTERNATE_EXCHANGE).str())
+{
+ (Opt(a)/NODE/X_DECLARE/ARGUMENTS).collect(arguments);
+ nodeBindings.setDefaultQueue(name);
+ linkBindings.setDefaultQueue(name);
}
void Queue::checkCreate(qpid::client::AsyncSession& session, CheckMode mode)
@@ -634,14 +684,7 @@ void Queue::checkCreate(qpid::client::As
} catch (const qpid::Exception& e) {
throw InvalidAddress((boost::format("Could not create queue %1%; %2%") % name % e.what()).str());
}
- try {
- for (Bindings::const_iterator i = bindings.begin(); i != bindings.end(); ++i) {
- session.exchangeBind(arg::queue=name, arg::exchange=i->exchange, arg::bindingKey=i->key);
- }
- session.sync();
- } catch (const qpid::Exception& e) {
- throw InvalidAddress((boost::format("Could not create queue bindings for %1%; %2%") % name % e.what()).str());
- }
+ nodeBindings.bind(session);
} else {
try {
sync(session).queueDeclare(arg::queue=name, arg::passive=true);
@@ -694,82 +737,38 @@ void Queue::checkAssert(qpid::client::As
% i->first % name % *(i->second) % *v).str());
}
}
- for (Bindings::const_iterator i = bindings.begin(); i != bindings.end(); ++i) {
- ExchangeBoundResult result = sync(session).exchangeBound(arg::queue=name, arg::exchange=i->exchange, arg::bindingKey=i->key);
- if (result.getQueueNotMatched() || result.getKeyNotMatched()) {
- throw InvalidAddress((boost::format("Binding %1%/%2% for %3% was not matched") % i->exchange % i->key % name).str());
- }
- }
- }
- }
-}
-
-void Queue::addBinding(const std::string& b)
-{
- string::size_type i = b.find('/');
- if (i == string::npos) {
- bindings.push_back(Binding(b, EMPTY_STRING));
- } else {
- std::string exchange = b.substr(0, i);
- if (i+1 < b.size()) {
- bindings.push_back(Binding(exchange, b.substr(i+1)));
- } else {
- bindings.push_back(Binding(exchange, EMPTY_STRING));
- }
- }
-}
-
-void Queue::addBindings(const Variant::List& list)
-{
- for (Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) {
- addBinding(i->asString());
- }
-}
-
-void Queue::configure(const Address& address)
-{
- const Variant& v = address.getOption(NODE_PROPERTIES);
- if (!v.isVoid()) {
- Variant::Map nodeProps = v.asMap();
- durable = nodeProps[DURABLE];
- Variant::Map::const_iterator x = nodeProps.find(X_PROPERTIES);
- if (x != nodeProps.end()) {
- const Variant::Map& xProps = x->second.asMap();
- Variant::Map passthrough;
- for (Variant::Map::const_iterator i = xProps.begin(); i != xProps.end(); ++i) {
- if (i->first == xamqp::AUTO_DELETE) autoDelete = i->second;
- else if (i->first == xamqp::EXCLUSIVE) exclusive = i->second;
- else if (i->first == xamqp::ALTERNATE_EXCHANGE) alternateExchange = i->second.asString();
- else if (i->first == xamqp::BINDINGS) addBindings(i->second.asList());
- else passthrough[i->first] = i->second;
- }
- translate(passthrough, arguments);
+ nodeBindings.check(session);
}
}
}
Exchange::Exchange(const Address& a) : Node(a),
- type(TOPIC_EXCHANGE),
- typeSpecified(false),
- durable(false),
- autoDelete(false)
-{
- configure(a);
+ specifiedType((Opt(a)/NODE/X_DECLARE/TYPE).str()),
+ durable(Opt(a)/NODE/DURABLE),
+ autoDelete(Opt(a)/NODE/X_DECLARE/AUTO_DELETE),
+ alternateExchange((Opt(a)/NODE/X_DECLARE/ALTERNATE_EXCHANGE).str())
+{
+ (Opt(a)/NODE/X_DECLARE/ARGUMENTS).collect(arguments);
+ nodeBindings.setDefaultExchange(name);
+ linkBindings.setDefaultExchange(name);
}
void Exchange::checkCreate(qpid::client::AsyncSession& session, CheckMode mode)
{
if (enabled(createPolicy, mode)) {
try {
+ std::string type = specifiedType;
+ if (type.empty()) type = TOPIC_EXCHANGE;
sync(session).exchangeDeclare(arg::exchange=name,
- arg::type=type,
- arg::durable=durable,
- arg::autoDelete=autoDelete,
- arg::alternateExchange=alternateExchange,
- arg::arguments=arguments);
+ arg::type=type,
+ arg::durable=durable,
+ arg::autoDelete=autoDelete,
+ arg::alternateExchange=alternateExchange,
+ arg::arguments=arguments);
} catch (const qpid::Exception& e) {
throw InvalidAddress((boost::format("Could not create exchange %1%; %2%") % name % e.what()).str());
}
+ nodeBindings.bind(session);
} else {
try {
sync(session).exchangeDeclare(arg::exchange=name, arg::passive=true);
@@ -800,9 +799,9 @@ void Exchange::checkAssert(qpid::client:
if (result.getNotFound()) {
throw InvalidAddress((boost::format("Exchange not found: %1%") % name).str());
} else {
- if (typeSpecified && result.getType() != type) {
+ if (specifiedType.size() && result.getType() != specifiedType) {
throw InvalidAddress((boost::format("Exchange %1% is of incorrect type, expected %2% but got %3%")
- % name % type % result.getType()).str());
+ % name % specifiedType % result.getType()).str());
}
if (durable && !result.getDurable()) {
throw InvalidAddress((boost::format("Exchange not durable: %1%") % name).str());
@@ -819,31 +818,79 @@ void Exchange::checkAssert(qpid::client:
% i->first % name % *(i->second) % *v).str());
}
}
+ nodeBindings.check(session);
}
}
}
-void Exchange::configure(const Address& address)
+Binding::Binding(const Variant::Map& b) :
+ exchange((Opt(b)/EXCHANGE).str()),
+ queue((Opt(b)/QUEUE).str()),
+ key((Opt(b)/KEY).str())
{
- const Variant& v = address.getOption(NODE_PROPERTIES);
- if (!v.isVoid()) {
- Variant::Map nodeProps = v.asMap();
- durable = nodeProps[DURABLE];
- Variant::Map::const_iterator x = nodeProps.find(X_PROPERTIES);
- if (x != nodeProps.end()) {
- const Variant::Map& xProps = x->second.asMap();
- Variant::Map passthrough;
- for (Variant::Map::const_iterator i = xProps.begin(); i != xProps.end(); ++i) {
- if (i->first == xamqp::AUTO_DELETE) autoDelete = i->second;
- else if (i->first == xamqp::EXCHANGE_TYPE) { type = i->second.asString(); typeSpecified = true; }
- else if (i->first == xamqp::ALTERNATE_EXCHANGE) alternateExchange = i->second.asString();
- else passthrough[i->first] = i->second;
- }
- translate(passthrough, arguments);
+ (Opt(b)/ARGUMENTS).collect(arguments);
+}
+
+Binding::Binding(const std::string& e, const std::string& q, const std::string& k) : exchange(e), queue(q), key(k) {}
+
+
+void Bindings::add(const Variant::List& list)
+{
+ for (Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) {
+ push_back(Binding(i->asMap()));
+ }
+}
+
+void Bindings::setDefaultExchange(const std::string& exchange)
+{
+ for (Bindings::iterator i = begin(); i != end(); ++i) {
+ if (i->exchange.empty()) i->exchange = exchange;
+ }
+}
+
+void Bindings::setDefaultQueue(const std::string& queue)
+{
+ for (Bindings::iterator i = begin(); i != end(); ++i) {
+ if (i->queue.empty()) i->queue = queue;
+ }
+}
+
+void Bindings::bind(qpid::client::AsyncSession& session)
+{
+ try {
+ for (Bindings::const_iterator i = begin(); i != end(); ++i) {
+ session.exchangeBind(arg::queue=i->queue,
+ arg::exchange=i->exchange,
+ arg::bindingKey=i->key,
+ arg::arguments=i->arguments);
}
+ session.sync();
+ } catch (const qpid::Exception& e) {
+ throw InvalidAddress((boost::format("Could not create node bindings: %1%") % e.what()).str());
}
}
+void Bindings::unbind(qpid::client::AsyncSession& session)
+{
+ for (Bindings::const_iterator i = begin(); i != end(); ++i) {
+ session.exchangeUnbind(arg::queue=i->queue,
+ arg::exchange=i->exchange,
+ arg::bindingKey=i->key);
+ }
+}
+
+void Bindings::check(qpid::client::AsyncSession& session)
+{
+ for (Bindings::const_iterator i = begin(); i != end(); ++i) {
+ ExchangeBoundResult result = sync(session).exchangeBound(arg::queue=i->queue,
+ arg::exchange=i->exchange,
+ arg::bindingKey=i->key);
+ if (result.getQueueNotMatched() || result.getKeyNotMatched()) {
+ throw InvalidAddress((boost::format("No such binding [exchange=%1%, queue=%2%, key=%3%]")
+ % i->exchange % i->queue % i->key).str());
+ }
+ }
+}
bool Node::enabled(const Variant& policy, CheckMode mode)
{
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp?rev=928123&r1=928122&r2=928123&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp Sat Mar 27 02:26:12 2010
@@ -34,7 +34,11 @@ using sys::Timer;
using sys::TimerTask;
-ClusterTimer::ClusterTimer(Cluster& c) : cluster(c) {}
+ClusterTimer::ClusterTimer(Cluster& c) : cluster(c) {
+ // Allow more generous overrun threshold with cluster as we
+ // have to do a CPG round trip before executing the task.
+ overran = 10*sys::TIME_MSEC;
+}
ClusterTimer::~ClusterTimer() {}
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=928123&r1=928122&r2=928123&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Sat Mar 27 02:26:12 2010
@@ -399,7 +399,10 @@ void UpdateClient::updateSession(broker:
SequenceNumber received = ss->receiverGetReceived().command;
if (inProgress)
--received;
-
+
+ // Sync the session to ensure all responses from broker have been processed.
+ shadowSession.sync();
+
// Reset command-sequence state.
proxy.sessionState(
ss->senderGetReplayPoint().command,
Propchange: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Mar 27 02:26:12 2010
@@ -1 +1 @@
-/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp:919043-926753
+/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp:919043-928107
Propchange: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/cluster/UpdateClient.h
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Mar 27 02:26:12 2010
@@ -1 +1 @@
-/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h:919043-926753
+/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h:919043-928107
Propchange: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementAgent.cpp
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Mar 27 02:26:12 2010
@@ -1 +1 @@
-/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp:919043-926753
+/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp:919043-928107
Propchange: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementAgent.h
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Mar 27 02:26:12 2010
@@ -1 +1 @@
-/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h:919043-926753
+/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h:919043-928107
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/Address.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/Address.cpp?rev=928123&r1=928122&r2=928123&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/Address.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/messaging/Address.cpp Sat Mar 27 02:26:12 2010
@@ -114,7 +114,7 @@ void Address::setOptions(const Variant::
namespace{
const Variant EMPTY_VARIANT;
const std::string EMPTY_STRING;
-const std::string NODE_PROPERTIES="node-properties";
+const std::string NODE_PROPERTIES="node";
}
const Variant& find(const Variant::Map& map, const std::string& key)
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/Timer.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/Timer.cpp?rev=928123&r1=928122&r2=928123&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/Timer.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/Timer.cpp Sat Mar 27 02:26:12 2010
@@ -76,7 +76,10 @@ void TimerTask::cancel() {
}
Timer::Timer() :
- active(false)
+ active(false),
+ late(50 * TIME_MSEC),
+ overran(2 * TIME_MSEC),
+ lateCancel(500 * TIME_MSEC)
{
start();
}
@@ -105,7 +108,7 @@ void Timer::run()
ScopedLock<Mutex> l(t->callbackLock);
if (t->cancelled) {
drop(t);
- if (delay > 500 * TIME_MSEC) {
+ if (delay > lateCancel) {
QPID_LOG(debug, "cancelled Timer woken up " << delay / TIME_MSEC
<< "ms late");
}
@@ -116,20 +119,19 @@ void Timer::run()
// Warn on callback overrun
AbsTime end(AbsTime::now());
Duration overrun(tasks.top()->nextFireTime, end);
- bool late = delay > 50 * TIME_MSEC;
- bool overran = overrun > 2 * TIME_MSEC;
- if (late)
- if (overran) {
+ if (delay > late) {
+ if (overrun > overran) {
+ QPID_LOG(warning,
+ "Timer woken up " << delay / TIME_MSEC << "ms late, "
+ "overrunning by " << overrun / TIME_MSEC << "ms [taking "
+ << Duration(start, end) << "]");
+ } else {
+ QPID_LOG(warning, "Timer woken up " << delay / TIME_MSEC << "ms late");
+ }
+ } else if (overrun > overran) {
QPID_LOG(warning,
- "Timer woken up " << delay / TIME_MSEC << "ms late, "
- "overrunning by " << overrun / TIME_MSEC << "ms [taking "
- << Duration(start, end) << "]");
- } else {
- QPID_LOG(warning, "Timer woken up " << delay / TIME_MSEC << "ms late");
- } else if (overran) {
- QPID_LOG(warning,
- "Timer callback overran by " << overrun / TIME_MSEC << "ms [taking "
- << Duration(start, end) << "]");
+ "Timer callback overran by " << overrun / TIME_MSEC <<
+ "ms [taking " << Duration(start, end) << "]");
}
continue;
} else {
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/Timer.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/Timer.h?rev=928123&r1=928122&r2=928123&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/Timer.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/sys/Timer.h Sat Mar 27 02:26:12 2010
@@ -92,6 +92,10 @@ class Timer : private Runnable {
protected:
QPID_COMMON_EXTERN virtual void fire(boost::intrusive_ptr<TimerTask> task);
QPID_COMMON_EXTERN virtual void drop(boost::intrusive_ptr<TimerTask> task);
+ // Allow derived classes to change the late/overran thresholds.
+ Duration late;
+ Duration overran;
+ Duration lateCancel;
};
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/MessagingSessionTests.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/MessagingSessionTests.cpp?rev=928123&r1=928122&r2=928123&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/MessagingSessionTests.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/MessagingSessionTests.cpp Sat Mar 27 02:26:12 2010
@@ -564,13 +564,13 @@ struct QueueCreatePolicyFixture : public
QPID_AUTO_TEST_CASE(testCreatePolicyQueueAlways)
{
- QueueCreatePolicyFixture fix("#; {create:always, node-properties:{type:queue}}");
+ QueueCreatePolicyFixture fix("#; {create:always, node:{type:queue}}");
fix.test();
}
QPID_AUTO_TEST_CASE(testCreatePolicyQueueReceiver)
{
- QueueCreatePolicyFixture fix("#; {create:receiver, node-properties:{type:queue}}");
+ QueueCreatePolicyFixture fix("#; {create:receiver, node:{type:queue}}");
Receiver r = fix.session.createReceiver(fix.address);
fix.test();
r.close();
@@ -578,7 +578,7 @@ QPID_AUTO_TEST_CASE(testCreatePolicyQueu
QPID_AUTO_TEST_CASE(testCreatePolicyQueueSender)
{
- QueueCreatePolicyFixture fix("#; {create:sender, node-properties:{type:queue}}");
+ QueueCreatePolicyFixture fix("#; {create:sender, node:{type:queue}}");
Sender s = fix.session.createSender(fix.address);
fix.test();
s.close();
@@ -608,14 +608,14 @@ struct ExchangeCreatePolicyFixture : pub
QPID_AUTO_TEST_CASE(testCreatePolicyTopic)
{
- ExchangeCreatePolicyFixture fix("#; {create:always, node-properties:{type:topic}}",
+ ExchangeCreatePolicyFixture fix("#; {create:always, node:{type:topic}}",
"topic");
fix.test();
}
QPID_AUTO_TEST_CASE(testCreatePolicyTopicReceiverFanout)
{
- ExchangeCreatePolicyFixture fix("#/my-subject; {create:receiver, node-properties:{type:topic, x-properties:{type:fanout}}}", "fanout");
+ ExchangeCreatePolicyFixture fix("#/my-subject; {create:receiver, node:{type:topic, x-declare:{type:fanout}}}", "fanout");
Receiver r = fix.session.createReceiver(fix.address);
fix.test();
r.close();
@@ -623,7 +623,7 @@ QPID_AUTO_TEST_CASE(testCreatePolicyTopi
QPID_AUTO_TEST_CASE(testCreatePolicyTopicSenderDirect)
{
- ExchangeCreatePolicyFixture fix("#/my-subject; {create:sender, node-properties:{type:topic, x-properties:{type:direct}}}", "direct");
+ ExchangeCreatePolicyFixture fix("#/my-subject; {create:sender, node:{type:topic, x-declare:{type:direct}}}", "direct");
Sender s = fix.session.createSender(fix.address);
fix.test();
s.close();
@@ -746,18 +746,18 @@ QPID_AUTO_TEST_CASE(testDeletePolicyExch
QPID_AUTO_TEST_CASE(testAssertPolicyQueue)
{
MessagingFixture fix;
- std::string a1 = "q; {create:always, assert:always, node-properties:{type:queue, durable:false, x-properties:{qpid.max-count:100}}}";
+ std::string a1 = "q; {create:always, assert:always, node:{type:queue, durable:false, x-declare:{arguments:{qpid.max-count:100}}}}";
Sender s1 = fix.session.createSender(a1);
s1.close();
Receiver r1 = fix.session.createReceiver(a1);
r1.close();
- std::string a2 = "q; {assert:receiver, node-properties:{durable:true, x-properties:{qpid.max-count:100}}}";
+ std::string a2 = "q; {assert:receiver, node:{durable:true, x-declare:{arguments:{qpid.max-count:100}}}}";
Sender s2 = fix.session.createSender(a2);
s2.close();
BOOST_CHECK_THROW(fix.session.createReceiver(a2), qpid::messaging::InvalidAddress);
- std::string a3 = "q; {assert:sender, node-properties:{x-properties:{qpid.max-count:99}}}";
+ std::string a3 = "q; {assert:sender, node:{x-declare:{arguments:{qpid.max-count:99}}}}";
BOOST_CHECK_THROW(fix.session.createSender(a3), qpid::messaging::InvalidAddress);
Receiver r3 = fix.session.createReceiver(a3);
r3.close();
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/cluster.mk
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/cluster.mk?rev=928123&r1=928122&r2=928123&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/cluster.mk (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/cluster.mk Sat Mar 27 02:26:12 2010
@@ -60,7 +60,7 @@ EXTRA_DIST += \
cluster_tests.fail
LONG_TESTS += \
- run_long_cluster_tests \
+ run_long_cluster_tests \
start_cluster \
cluster_python_tests \
stop_cluster
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/cluster_tests.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/cluster_tests.py?rev=928123&r1=928122&r2=928123&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/cluster_tests.py Sat Mar 27 02:26:12 2010
@@ -156,11 +156,11 @@ class LongTests(BrokerTest):
ErrorGenerator(b)
time.sleep(min(5,self.duration()/2))
sender.stop()
- receiver.stop(sender.sent)
+ receiver.stop()
for i in range(i, len(cluster)): cluster[i].kill()
def test_management(self):
- """Run management clients and other clients concurrently."""
+ """Stress test: Run management clients and other clients concurrently."""
# TODO aconway 2010-03-03: move to brokertest framework
class ClientLoop(StoppableThread):
@@ -171,6 +171,7 @@ class LongTests(BrokerTest):
self.cmd = cmd # Client command.
self.lock = Lock()
self.process = None # Client process.
+ self._expect_fail = False
self.start()
def run(self):
@@ -195,7 +196,7 @@ class LongTests(BrokerTest):
try:
# Quit and ignore errors if stopped or expecting failure.
if self.stopped: break
- if exit != 0:
+ if not self._expect_fail and exit != 0:
self.process.unexpected(
"client of %s exit code %s"%(self.broker.name, exit))
finally: self.lock.release()
@@ -205,12 +206,13 @@ class LongTests(BrokerTest):
def expect_fail(self):
"""Ignore exit status of the currently running client."""
self.lock.acquire()
- stopped = True
+ self._expect_fail = True
self.lock.release()
def stop(self):
"""Stop the running client and wait for it to exit"""
self.lock.acquire()
+ if self.stopped: return
try:
self.stopped = True
if self.process:
@@ -228,45 +230,44 @@ class LongTests(BrokerTest):
clients = [] # Ordinary clients that only connect to one broker.
mclients = [] # Management clients that connect to every broker in the cluster.
- def start_clients(broker):
- """Start ordinary clients for a broker"""
- batch = []
- for cmd in [
- ["perftest", "--count", 1000,
+ def start_clients(broker, i):
+ """Start ordinary clients for a broker. Start one client per broker.
+ Round-robin on a colllection of different clients."""
+ cmds=[
+ ["perftest", "--count", 50000,
"--base-name", str(qpid.datatypes.uuid4()), "--port", broker.port()],
["qpid-queue-stats", "-a", "localhost:%s" %(broker.port())],
- ["testagent", "localhost", str(broker.port())] ]:
- batch.append(ClientLoop(broker, cmd))
- clients.append(batch)
+ ["testagent", "localhost", str(broker.port())] ]
+ clients.append([ClientLoop(broker, cmds[i%len(cmds)])])
def start_mclients(broker):
- """Start management clients that make multiple connections"""
- for cmd in [
- ["qpid-stat", "-b", "localhost:%s" %(broker.port())]
- ]:
- mclients.append(ClientLoop(broker, cmd))
+ """Start management clients that make multiple connections."""
+ cmd = ["qpid-stat", "-b", "localhost:%s" %(broker.port())]
+ mclients.append(ClientLoop(broker, cmd))
endtime = time.time() + self.duration()
alive = 0 # First live cluster member
- for b in cluster:
- start_clients(b)
- start_mclients(b)
+ for i in range(len(cluster)):
+ start_clients(cluster[i], i)
+ start_mclients(cluster[alive])
while time.time() < endtime:
time.sleep(min(5,self.duration()/2))
for b in cluster[alive:]: b.ready() # Check if a broker crashed.
- # Kill the first broker. Ignore errors on its clients and all the mclients
+ # Kill the first broker, expect the clients to fail.
for c in clients[alive] + mclients: c.expect_fail()
- clients[alive] = []
- mclients = []
b = cluster[alive]
b.expect = EXPECT_EXIT_FAIL
b.kill()
+ # Stop the brokers clients and all the mclients.
+ for c in clients[alive] + mclients: c.stop()
+ clients[alive] = []
+ mclients = []
# Start another broker and clients
alive += 1
- b = cluster.start()
- start_clients(b)
- for b in cluster[alive:]: start_mclients(b)
+ cluster.start()
+ start_clients(cluster[-1], len(cluster)-1)
+ start_mclients(cluster[alive])
for c in chain(mclients, *clients):
c.stop()
Propchange: qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/cluster_tests.py
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Mar 27 02:26:12 2010
@@ -1 +1 @@
-/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py:919043-926753
+/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py:919043-928107
Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/run_long_cluster_tests
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/run_long_cluster_tests?rev=928123&r1=928122&r2=928123&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/run_long_cluster_tests (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/tests/run_long_cluster_tests Sat Mar 27 02:26:12 2010
@@ -20,4 +20,5 @@
#
srcdir=`dirname $0`
-$srcdir/run_cluster_tests long_cluster_tests
+$srcdir/run_cluster_tests 'cluster_tests.LongTests.*' -DDURATION=2
+
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org