You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2011/02/19 16:03:19 UTC

svn commit: r1072356 [1/2] - in /qpid/trunk/qpid: ./ cpp/examples/tradedemo/ cpp/include/qmf/engine/ cpp/src/ cpp/src/qmf/engine/ cpp/src/qpid/broker/ cpp/src/qpid/cluster/ cpp/src/qpid/management/ cpp/src/qpid/sys/ cpp/src/tests/ dotnet/ java/ java/br...

Author: kgiusti
Date: Sat Feb 19 15:03:16 2011
New Revision: 1072356

URL: http://svn.apache.org/viewvc?rev=1072356&view=rev
Log:
QPID-2935: merge producer flow control (C++ broker).

Added:
    qpid/trunk/qpid/cpp/src/qpid/broker/AsyncCompletion.h
      - copied unchanged from r1072333, qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/AsyncCompletion.h
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
      - copied unchanged from r1072333, qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
      - copied unchanged from r1072333, qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
    qpid/trunk/qpid/cpp/src/tests/QueueFlowLimitTest.cpp
      - copied unchanged from r1072333, qpid/branches/qpid-2935/qpid/cpp/src/tests/QueueFlowLimitTest.cpp
    qpid/trunk/qpid/cpp/src/tests/queue_flow_limit_tests.py
      - copied unchanged from r1072333, qpid/branches/qpid-2935/qpid/cpp/src/tests/queue_flow_limit_tests.py
    qpid/trunk/qpid/cpp/src/tests/run_queue_flow_limit_tests
      - copied unchanged from r1072333, qpid/branches/qpid-2935/qpid/cpp/src/tests/run_queue_flow_limit_tests
Removed:
    qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.h
    qpid/trunk/qpid/cpp/src/tests/IncompleteMessageList.cpp
Modified:
    qpid/trunk/qpid/   (props changed)
    qpid/trunk/qpid/cpp/examples/tradedemo/tradedemo_topic_publisher.vcproj   (props changed)
    qpid/trunk/qpid/cpp/include/qmf/engine/Agent.h   (props changed)
    qpid/trunk/qpid/cpp/include/qmf/engine/Console.h   (props changed)
    qpid/trunk/qpid/cpp/src/CMakeLists.txt   (contents, props changed)
    qpid/trunk/qpid/cpp/src/Makefile.am
    qpid/trunk/qpid/cpp/src/qmf/engine/Agent.cpp   (props changed)
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
    qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp   (props changed)
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h   (props changed)
    qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp   (props changed)
    qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h   (props changed)
    qpid/trunk/qpid/cpp/src/qpid/sys/ClusterSafe.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/ClusterSafe.h
    qpid/trunk/qpid/cpp/src/tests/CMakeLists.txt
    qpid/trunk/qpid/cpp/src/tests/Makefile.am
    qpid/trunk/qpid/cpp/src/tests/MessageUtils.h
    qpid/trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp
    qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
    qpid/trunk/qpid/cpp/src/tests/TxPublishTest.cpp
    qpid/trunk/qpid/cpp/src/tests/brokertest.py
    qpid/trunk/qpid/cpp/src/tests/cluster_tests.py   (contents, props changed)
    qpid/trunk/qpid/dotnet/build-msbuild.bat   (props changed)
    qpid/trunk/qpid/dotnet/build-nant-release   (props changed)
    qpid/trunk/qpid/dotnet/build-nant.bat   (props changed)
    qpid/trunk/qpid/java/   (props changed)
    qpid/trunk/qpid/java/broker/   (props changed)
    qpid/trunk/qpid/java/broker/bin/   (props changed)
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/   (props changed)
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/   (props changed)
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/   (props changed)
    qpid/trunk/qpid/java/client/src/main/java/client.log4j   (props changed)
    qpid/trunk/qpid/java/integrationtests/src/resources/sustained-log4j.xml   (props changed)
    qpid/trunk/qpid/java/management/client/etc/qman.log4j   (props changed)
    qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/   (props changed)
    qpid/trunk/qpid/java/management/client/src/test/java/org/apache/qpid/management/   (props changed)
    qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ConfigurationManagement.java   (props changed)
    qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/LoggingManagement.java   (props changed)
    qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java   (props changed)
    qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java   (props changed)
    qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java   (props changed)
    qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java   (props changed)
    qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/UserManagement.java   (props changed)
    qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanAttribute.java   (props changed)
    qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanConstructor.java   (props changed)
    qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanDescription.java   (props changed)
    qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperation.java   (props changed)
    qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperationParameter.java   (props changed)
    qpid/trunk/qpid/java/management/eclipse-plugin/src/   (props changed)
    qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ConnectionTypeTabControl.java   (props changed)
    qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ExchangeTypeTabControl.java   (props changed)
    qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/MBeanTypeTabControl.java   (props changed)
    qpid/trunk/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/QueueTypeTabControl.java   (props changed)
    qpid/trunk/qpid/java/management/eclipse-plugin/src/main/resources/macosx/Contents/MacOS/qpidmc   (props changed)
    qpid/trunk/qpid/java/perftests/etc/perftests.log4j   (props changed)
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/MultipleJCAProviderRegistrationTest.java   (props changed)
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java   (props changed)
    qpid/trunk/qpid/java/systests/src/main/java/systests.log4j   (props changed)
    qpid/trunk/qpid/java/test-profiles/   (props changed)
    qpid/trunk/qpid/java/test-profiles/08StandaloneExcludes   (props changed)
    qpid/trunk/qpid/java/test-profiles/CPPExcludes   (props changed)
    qpid/trunk/qpid/java/test-profiles/Excludes   (props changed)
    qpid/trunk/qpid/java/test-profiles/JavaExcludes   (props changed)
    qpid/trunk/qpid/java/test-profiles/JavaStandaloneExcludes   (props changed)
    qpid/trunk/qpid/java/test-profiles/JavaTransientExcludes   (props changed)
    qpid/trunk/qpid/java/test-profiles/XAExcludes   (props changed)
    qpid/trunk/qpid/java/test-profiles/clean-dir   (props changed)
    qpid/trunk/qpid/java/test-profiles/cpp.async.testprofile   (props changed)
    qpid/trunk/qpid/java/test-profiles/cpp.cluster.testprofile   (props changed)
    qpid/trunk/qpid/java/test-profiles/cpp.noprefetch.testprofile   (props changed)
    qpid/trunk/qpid/java/test-profiles/cpp.ssl.excludes   (props changed)
    qpid/trunk/qpid/java/test-profiles/cpp.ssl.testprofile   (props changed)
    qpid/trunk/qpid/java/test-profiles/cpp.testprofile   (props changed)
    qpid/trunk/qpid/java/test-profiles/default.testprofile   (props changed)
    qpid/trunk/qpid/java/test-profiles/java-derby.testprofile   (props changed)
    qpid/trunk/qpid/java/test-profiles/java.testprofile   (props changed)
    qpid/trunk/qpid/java/test-profiles/log4j-test.xml   (props changed)
    qpid/trunk/qpid/java/test-profiles/test-provider.properties   (props changed)
    qpid/trunk/qpid/java/test-profiles/test_resources/   (props changed)
    qpid/trunk/qpid/java/tools/etc/test.log4j   (props changed)
    qpid/trunk/qpid/packaging/windows/   (props changed)
    qpid/trunk/qpid/python/   (props changed)
    qpid/trunk/qpid/python/examples/api/spout   (props changed)
    qpid/trunk/qpid/python/qpid/concurrency.py   (props changed)
    qpid/trunk/qpid/ruby/ext/sasl/extconf.rb   (props changed)
    qpid/trunk/qpid/specs/management-schema.xml
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/queue.py   (props changed)
    qpid/trunk/qpid/tools/src/py/qpid-config

Propchange: qpid/trunk/qpid/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 19 15:03:16 2011
@@ -2,3 +2,4 @@
 /qpid/branches/0.6-release-windows-installer:926803
 /qpid/branches/0.6-release-windows-installer/qpid:926803,927233
 /qpid/branches/java-network-refactor/qpid:805429-825319
+/qpid/branches/qpid-2935/qpid:1061302-1072333

Propchange: qpid/trunk/qpid/cpp/examples/tradedemo/tradedemo_topic_publisher.vcproj
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 19 15:03:16 2011
@@ -2,3 +2,4 @@
 /qpid/branches/0.6-release-windows-installer/cpp/examples/tradedemo/tradedemo_topic_publisher.vcproj:926803
 /qpid/branches/0.6-release-windows-installer/qpid/cpp/examples/tradedemo/tradedemo_topic_publisher.vcproj:926803,927218,927233
 /qpid/branches/java-network-refactor/qpid/cpp/examples/tradedemo/tradedemo_topic_publisher.vcproj:805429-825319
+/qpid/branches/qpid-2935/qpid/cpp/examples/tradedemo/tradedemo_topic_publisher.vcproj:1061302-1072333

Propchange: qpid/trunk/qpid/cpp/include/qmf/engine/Agent.h
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 19 15:03:16 2011
@@ -0,0 +1 @@
+/qpid/branches/qpid-2935/qpid/cpp/include/qmf/engine/Agent.h:1061302-1072333

Propchange: qpid/trunk/qpid/cpp/include/qmf/engine/Console.h
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 19 15:03:16 2011
@@ -0,0 +1 @@
+/qpid/branches/qpid-2935/qpid/cpp/include/qmf/engine/Console.h:1061302-1072333

Modified: qpid/trunk/qpid/cpp/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/CMakeLists.txt?rev=1072356&r1=1072355&r2=1072356&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/CMakeLists.txt (original)
+++ qpid/trunk/qpid/cpp/src/CMakeLists.txt Sat Feb 19 15:03:16 2011
@@ -985,7 +985,6 @@ set (qpidbroker_SOURCES
      qpid/broker/ExchangeRegistry.cpp
      qpid/broker/FanOutExchange.cpp
      qpid/broker/HeadersExchange.cpp
-     qpid/broker/IncompleteMessageList.cpp
      qpid/broker/Link.cpp
      qpid/broker/LinkRegistry.cpp
      qpid/broker/Message.cpp
@@ -998,6 +997,7 @@ set (qpidbroker_SOURCES
      qpid/broker/QueueEvents.cpp
      qpid/broker/QueuePolicy.cpp
      qpid/broker/QueueRegistry.cpp
+     qpid/broker/QueueFlowLimit.cpp
      qpid/broker/RateTracker.cpp
      qpid/broker/RecoveryManagerImpl.cpp
      qpid/broker/RecoveredEnqueue.cpp

Propchange: qpid/trunk/qpid/cpp/src/CMakeLists.txt
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 19 15:03:16 2011
@@ -2,3 +2,4 @@
 /qpid/branches/0.6-release-windows-installer/cpp/src/CMakeLists.txt:926803
 /qpid/branches/0.6-release-windows-installer/qpid/cpp/src/CMakeLists.txt:926803,927233,932132
 /qpid/branches/java-network-refactor/qpid/cpp/src/CMakeLists.txt:805429-825319
+/qpid/branches/qpid-2935/qpid/cpp/src/CMakeLists.txt:1061302-1072333

Modified: qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=1072356&r1=1072355&r2=1072356&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/Makefile.am Sat Feb 19 15:03:16 2011
@@ -561,8 +561,7 @@ libqpidbroker_la_SOURCES = \
   qpid/broker/HandlerImpl.h \
   qpid/broker/HeadersExchange.cpp \
   qpid/broker/HeadersExchange.h \
-  qpid/broker/IncompleteMessageList.cpp \
-  qpid/broker/IncompleteMessageList.h \
+  qpid/broker/AsyncCompletion.h \
   qpid/broker/LegacyLVQ.h \
   qpid/broker/LegacyLVQ.cpp \
   qpid/broker/Link.cpp \
@@ -612,6 +611,8 @@ libqpidbroker_la_SOURCES = \
   qpid/broker/QueueRegistry.cpp \
   qpid/broker/QueueRegistry.h \
   qpid/broker/QueuedMessage.h \
+  qpid/broker/QueueFlowLimit.h \
+  qpid/broker/QueueFlowLimit.cpp \
   qpid/broker/RateFlowcontrol.h \
   qpid/broker/RateTracker.cpp \
   qpid/broker/RateTracker.h \

Propchange: qpid/trunk/qpid/cpp/src/qmf/engine/Agent.cpp
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 19 15:03:16 2011
@@ -0,0 +1 @@
+/qpid/branches/qpid-2935/qpid/cpp/src/qmf/engine/Agent.cpp:1061302-1072333

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1072356&r1=1072355&r2=1072356&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Sat Feb 19 15:03:16 2011
@@ -32,6 +32,7 @@
 #include "qpid/broker/TopicExchange.h"
 #include "qpid/broker/Link.h"
 #include "qpid/broker/ExpiryPolicy.h"
+#include "qpid/broker/QueueFlowLimit.h"
 
 #include "qmf/org/apache/qpid/broker/Package.h"
 #include "qmf/org/apache/qpid/broker/ArgsBrokerCreate.h"
@@ -118,7 +119,9 @@ Broker::Options::Options(const std::stri
     maxSessionRate(0),
     asyncQueueEvents(false),     // Must be false in a cluster.
     qmf2Support(true),
-    qmf1Support(true)
+    qmf1Support(true),
+    queueFlowStopRatio(80),
+    queueFlowResumeRatio(70)
 {
     int c = sys::SystemInfo::concurrency();
     workerThreads=c+1;
@@ -151,7 +154,9 @@ Broker::Options::Options(const std::stri
         ("known-hosts-url", optValue(knownHosts, "URL or 'none'"), "URL to send as 'known-hosts' to clients ('none' implies empty list)")
         ("sasl-config", optValue(saslConfigPath, "FILE"), "gets sasl config from nonstandard location")
         ("max-session-rate", optValue(maxSessionRate, "MESSAGES/S"), "Sets the maximum message rate per session (0=unlimited)")
-        ("async-queue-events", optValue(asyncQueueEvents, "yes|no"), "Set Queue Events async, used for services like replication");
+        ("async-queue-events", optValue(asyncQueueEvents, "yes|no"), "Set Queue Events async, used for services like replication")
+        ("default-flow-stop-threshold", optValue(queueFlowStopRatio, "%MESSAGES"), "Queue capacity level at which flow control is activated.")
+        ("default-flow-resume-threshold", optValue(queueFlowResumeRatio, "%MESSAGES"), "Queue capacity level at which flow control is de-activated.");
 }
 
 const std::string empty;
@@ -182,8 +187,9 @@ Broker::Broker(const Broker::Options& co
             conf.replayHardLimit*1024),
         *this),
     queueCleaner(queues, timer),
-    queueEvents(poller,!conf.asyncQueueEvents), 
+    queueEvents(poller,!conf.asyncQueueEvents),
     recovery(true),
+    inCluster(false),
     clusterUpdatee(false),
     expiryPolicy(new ExpiryPolicy),
     connectionCounter(conf.maxConnections),
@@ -240,8 +246,16 @@ Broker::Broker(const Broker::Options& co
     // Early-Initialize plugins
     Plugin::earlyInitAll(*this);
 
+    /** todo KAG - remove once cluster support for flow control done */
+    if (isInCluster()) {
+        QPID_LOG(info, "Producer Flow Control TBD for clustered brokers - queue flow control disabled by default.");
+        QueueFlowLimit::setDefaults(0, 0, 0);
+    } else {
+        QueueFlowLimit::setDefaults(conf.queueLimit, conf.queueFlowStopRatio, conf.queueFlowResumeRatio);
+    }
+
     // If no plugin store module registered itself, set up the null store.
-    if (NullMessageStore::isNullStore(store.get())) 
+    if (NullMessageStore::isNullStore(store.get()))
         setStore();
 
     exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
@@ -360,14 +374,14 @@ void Broker::run() {
         Dispatcher d(poller);
         int numIOThreads = config.workerThreads;
         std::vector<Thread> t(numIOThreads-1);
-        
+
         // Run n-1 io threads
         for (int i=0; i<numIOThreads-1; ++i)
             t[i] = Thread(d);
-        
+
         // Run final thread
         d.run();
-        
+
         // Now wait for n-1 io threads to exit
         for (int i=0; i<numIOThreads-1; ++i) {
             t[i].join();
@@ -414,9 +428,9 @@ Manageable::status_t Broker::ManagementM
     {
     case _qmf::Broker::METHOD_ECHO :
         QPID_LOG (debug, "Broker::echo("
-                  << dynamic_cast<_qmf::ArgsBrokerEcho&>(args).io_sequence 
-                  << ", " 
-                  << dynamic_cast<_qmf::ArgsBrokerEcho&>(args).io_body 
+                  << dynamic_cast<_qmf::ArgsBrokerEcho&>(args).io_sequence
+                  << ", "
+                  << dynamic_cast<_qmf::ArgsBrokerEcho&>(args).io_body
                   << ")");
         status = Manageable::STATUS_OK;
         break;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=1072356&r1=1072355&r2=1072356&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Sat Feb 19 15:03:16 2011
@@ -118,29 +118,31 @@ public:
         bool asyncQueueEvents;
         bool qmf2Support;
         bool qmf1Support;
+        uint queueFlowStopRatio;    // producer flow control: on
+        uint queueFlowResumeRatio;  // producer flow control: off
 
       private:
         std::string getHome();
     };
-    
+
     class ConnectionCounter {
             int maxConnections;
             int connectionCount;
             sys::Mutex connectionCountLock;
         public:
             ConnectionCounter(int mc): maxConnections(mc),connectionCount(0) {};
-            void inc_connectionCount() {    
-                sys::ScopedLock<sys::Mutex> l(connectionCountLock); 
+            void inc_connectionCount() {
+                sys::ScopedLock<sys::Mutex> l(connectionCountLock);
                 connectionCount++;
-            } 
-            void dec_connectionCount() {    
-                sys::ScopedLock<sys::Mutex> l(connectionCountLock); 
+            }
+            void dec_connectionCount() {
+                sys::ScopedLock<sys::Mutex> l(connectionCountLock);
                 connectionCount--;
             }
             bool allowConnection() {
-                sys::ScopedLock<sys::Mutex> l(connectionCountLock); 
+                sys::ScopedLock<sys::Mutex> l(connectionCountLock);
                 return (maxConnections <= connectionCount);
-            } 
+            }
     };
 
   private:
@@ -182,7 +184,7 @@ public:
                            const boost::intrusive_ptr<Message>& msg);
     std::string federationTag;
     bool recovery;
-    bool clusterUpdatee;
+    bool inCluster, clusterUpdatee;
     boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
     ConnectionCounter connectionCounter;
 
@@ -241,7 +243,7 @@ public:
     QPID_BROKER_EXTERN void accept();
 
     /** Create a connection to another broker. */
-    void connect(const std::string& host, uint16_t port, 
+    void connect(const std::string& host, uint16_t port,
                  const std::string& transport,
                  boost::function2<void, int, std::string> failed,
                  sys::ConnectionCodec::Factory* =0);
@@ -253,9 +255,9 @@ public:
     /** Move messages from one queue to another.
         A zero quantity means to move all messages
     */
-    uint32_t queueMoveMessages( const std::string& srcQueue, 
+    uint32_t queueMoveMessages( const std::string& srcQueue,
 			    const std::string& destQueue,
-			    uint32_t  qty); 
+			    uint32_t  qty);
 
     boost::shared_ptr<sys::ProtocolFactory> getProtocolFactory(const std::string& name = TCP_TRANSPORT) const;
 
@@ -279,8 +281,17 @@ public:
     void setRecovery(bool set) { recovery = set; }
     bool getRecovery() const { return recovery; }
 
-    void setClusterUpdatee(bool set) { clusterUpdatee = set; }
+    /** True of this broker is part of a cluster.
+     * Only valid after early initialization of plugins is complete.
+     */
+    bool isInCluster() const { return inCluster; }
+    void setInCluster(bool set) { inCluster = set; }
+
+    /** True if this broker is joining a cluster and in the process of
+     * receiving a state update.
+     */
     bool isClusterUpdatee() const { return clusterUpdatee; }
+    void setClusterUpdatee(bool set) { clusterUpdatee = set; }
 
     management::ManagementAgent* getManagementAgent() { return managementAgent.get(); }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=1072356&r1=1072355&r2=1072356&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Sat Feb 19 15:03:16 2011
@@ -278,8 +278,7 @@ void Connection::setUserId(const string&
     ConnectionState::setUserId(userId);
     // In a cluster, the cluster code will raise the connect event
     // when the connection is replicated to the cluster.
-    if (!sys::isCluster())
-        raiseConnectEvent();
+    if (!broker.isInCluster()) raiseConnectEvent();
 }
 
 void Connection::raiseConnectEvent() {

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp?rev=1072356&r1=1072355&r2=1072356&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp Sat Feb 19 15:03:16 2011
@@ -134,7 +134,7 @@ void Link::established ()
     QPID_LOG (info, "Inter-broker link established to " << addr.str());
 
     // Don't raise the management event in a cluster, other members wont't get this call.
-    if (!sys::isCluster()) 
+    if (broker && !broker->isInCluster())
         agent->raiseEvent(_qmf::EventBrokerLinkUp(addr.str()));
 
     {
@@ -159,7 +159,7 @@ void Link::closed (int, std::string text
         stringstream addr;
         addr << host << ":" << port;
         QPID_LOG (warning, "Inter-broker link disconnected from " << addr.str());
-        if (!sys::isCluster())
+        if (broker && !broker->isInCluster())
             agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str()));
     }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=1072356&r1=1072355&r2=1072356&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Sat Feb 19 15:03:16 2011
@@ -50,14 +50,15 @@ TransferAdapter Message::TRANSFER;
 Message::Message(const framing::SequenceNumber& id) :
     frames(id), persistenceId(0), redelivered(false), loaded(false),
     staged(false), forcePersistentPolicy(false), publisher(0), adapter(0), 
-    expiration(FAR_FUTURE), enqueueCallback(0), dequeueCallback(0),
-    inCallback(false), requiredCredit(0) {}
+    expiration(FAR_FUTURE), dequeueCallback(0),
+    inCallback(false), requiredCredit(0)
+{}
 
 Message::Message(const Message& original) :
     PersistableMessage(), frames(original.frames), persistenceId(0), redelivered(false), loaded(false),
     staged(false), forcePersistentPolicy(false), publisher(0), adapter(0), 
-    expiration(original.expiration), enqueueCallback(0), dequeueCallback(0),
-    inCallback(false), requiredCredit(0) 
+    expiration(original.expiration), dequeueCallback(0),
+    inCallback(false), requiredCredit(0)
 {
     setExpiryPolicy(original.expiryPolicy);
 }
@@ -415,30 +416,12 @@ struct ScopedSet {
 };
 }
 
-void Message::allEnqueuesComplete() {
-    ScopedSet ss(callbackLock, inCallback);
-    MessageCallback* cb = enqueueCallback;
-    if (cb && *cb) (*cb)(intrusive_ptr<Message>(this));
-}
-
 void Message::allDequeuesComplete() {
     ScopedSet ss(callbackLock, inCallback);
     MessageCallback* cb = dequeueCallback;
     if (cb && *cb) (*cb)(intrusive_ptr<Message>(this));
 }
 
-void Message::setEnqueueCompleteCallback(MessageCallback& cb) {
-    sys::Mutex::ScopedLock l(callbackLock);
-    while (inCallback) callbackLock.wait();
-    enqueueCallback = &cb;
-}
-
-void Message::resetEnqueueCompleteCallback() {
-    sys::Mutex::ScopedLock l(callbackLock);
-    while (inCallback) callbackLock.wait();
-    enqueueCallback = 0;
-}
-
 void Message::setDequeueCompleteCallback(MessageCallback& cb) {
     sys::Mutex::ScopedLock l(callbackLock);
     while (inCallback) callbackLock.wait();

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=1072356&r1=1072355&r2=1072356&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Sat Feb 19 15:03:16 2011
@@ -154,10 +154,6 @@ public:
        bool isForcedPersistent();
     
 
-    /** Call cb when enqueue is complete, may call immediately. Holds cb by reference. */
-    void setEnqueueCompleteCallback(MessageCallback& cb);
-    void resetEnqueueCompleteCallback();
-
     /** Call cb when dequeue is complete, may call immediately. Holds cb by reference. */
     void setDequeueCompleteCallback(MessageCallback& cb);
     void resetDequeueCompleteCallback();
@@ -166,7 +162,6 @@ public:
 
   private:
     MessageAdapter& getAdapter() const;
-    void allEnqueuesComplete();
     void allDequeuesComplete();
 
     mutable sys::Mutex lock;
@@ -187,7 +182,6 @@ public:
     mutable boost::intrusive_ptr<Message> empty;
 
     sys::Monitor callbackLock;
-    MessageCallback* enqueueCallback;
     MessageCallback* dequeueCallback;
     bool inCallback;
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp?rev=1072356&r1=1072355&r2=1072356&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp Sat Feb 19 15:03:16 2011
@@ -34,7 +34,6 @@ class MessageStore;
 PersistableMessage::~PersistableMessage() {}
 
 PersistableMessage::PersistableMessage() :
-    asyncEnqueueCounter(0), 
     asyncDequeueCounter(0),
     store(0)
 {}
@@ -68,24 +67,6 @@ bool PersistableMessage::isContentReleas
     return contentReleaseState.released;
 }
        
-bool PersistableMessage::isEnqueueComplete() {
-    sys::ScopedLock<sys::Mutex> l(asyncEnqueueLock);
-    return asyncEnqueueCounter == 0;
-}
-
-void PersistableMessage::enqueueComplete() {
-    bool notify = false;
-    {
-        sys::ScopedLock<sys::Mutex> l(asyncEnqueueLock);
-        if (asyncEnqueueCounter > 0) {
-            if (--asyncEnqueueCounter == 0) {
-                notify = true;
-            }
-        }
-    }
-    if (notify) 
-        allEnqueuesComplete();
-}
 
 bool PersistableMessage::isStoredOnQueue(PersistableQueue::shared_ptr queue){
     if (store && (queue->getPersistenceId()!=0)) {
@@ -109,12 +90,7 @@ void PersistableMessage::addToSyncList(P
 
 void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) { 
     addToSyncList(queue, _store);
-    enqueueAsync();
-}
-
-void PersistableMessage::enqueueAsync() { 
-    sys::ScopedLock<sys::Mutex> l(asyncEnqueueLock);
-    asyncEnqueueCounter++; 
+    enqueueStart();
 }
 
 bool PersistableMessage::isDequeueComplete() { 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h?rev=1072356&r1=1072355&r2=1072356&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h Sat Feb 19 15:03:16 2011
@@ -31,6 +31,7 @@
 #include "qpid/framing/amqp_types.h"
 #include "qpid/sys/Mutex.h"
 #include "qpid/broker/PersistableQueue.h"
+#include "qpid/broker/AsyncCompletion.h"
 
 namespace qpid {
 namespace broker {
@@ -43,18 +44,19 @@ class MessageStore;
 class PersistableMessage : public Persistable
 {
     typedef std::list< boost::weak_ptr<PersistableQueue> > syncList;
-    sys::Mutex asyncEnqueueLock;
     sys::Mutex asyncDequeueLock;
     sys::Mutex storeLock;
-       
+
     /**
-     * Tracks the number of outstanding asynchronous enqueue
-     * operations. When the message is enqueued asynchronously the
-     * count is incremented; when that enqueue completes it is
-     * decremented. Thus when it is 0, there are no outstanding
-     * enqueues.
+     * "Ingress" messages == messages sent _to_ the broker.
+     * Tracks the number of outstanding asynchronous operations that must
+     * complete before an inbound message can be considered fully received by the
+     * broker.  E.g. all enqueues have completed, the message has been written
+     * to store, credit has been replenished, etc. Once all outstanding
+     * operations have completed, the transfer of this message from the client
+     * may be considered complete.
      */
-    int asyncEnqueueCounter;
+    boost::shared_ptr<AsyncCompletion> ingressCompletion;
 
     /**
      * Tracks the number of outstanding asynchronous dequeue
@@ -65,7 +67,6 @@ class PersistableMessage : public Persis
      */
     int asyncDequeueCounter;
 
-    void enqueueAsync();
     void dequeueAsync();
 
     syncList synclist;
@@ -80,8 +81,6 @@ class PersistableMessage : public Persis
     ContentReleaseState contentReleaseState;
 
   protected:
-    /** Called when all enqueues are complete for this message. */
-    virtual void allEnqueuesComplete() = 0;
     /** Called when all dequeues are complete for this message. */
     virtual void allDequeuesComplete() = 0;
 
@@ -115,9 +114,13 @@ class PersistableMessage : public Persis
 
     virtual QPID_BROKER_EXTERN bool isPersistent() const = 0;
 
-    QPID_BROKER_EXTERN bool isEnqueueComplete();
+    /** track the progress of a message received by the broker - see ingressCompletion above */
+    QPID_BROKER_EXTERN bool isIngressComplete() { return !ingressCompletion || ingressCompletion->isDone(); }
+    QPID_BROKER_EXTERN boost::shared_ptr<AsyncCompletion>& getIngressCompletion() { return ingressCompletion; }
+    QPID_BROKER_EXTERN void setIngressCompletion(boost::shared_ptr<AsyncCompletion>& ic) { ingressCompletion = ic; }
 
-    QPID_BROKER_EXTERN void enqueueComplete();
+    QPID_BROKER_EXTERN void enqueueStart() { if (ingressCompletion) ingressCompletion->startCompleter(); }
+    QPID_BROKER_EXTERN void enqueueComplete() { if (ingressCompletion) ingressCompletion->finishCompleter(); }
 
     QPID_BROKER_EXTERN void enqueueAsync(PersistableQueue::shared_ptr queue,
                                          MessageStore* _store);
@@ -133,7 +136,6 @@ class PersistableMessage : public Persis
     bool isStoredOnQueue(PersistableQueue::shared_ptr queue);
     
     void addToSyncList(PersistableQueue::shared_ptr queue, MessageStore* _store);
-    
 };
 
 }}

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1072356&r1=1072355&r2=1072356&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Sat Feb 19 15:03:16 2011
@@ -31,6 +31,7 @@
 #include "qpid/broker/MessageStore.h"
 #include "qpid/broker/NullMessageStore.h"
 #include "qpid/broker/QueueRegistry.h"
+#include "qpid/broker/QueueFlowLimit.h"
 #include "qpid/broker/ThresholdAlerts.h"
 
 #include "qpid/StringUtils.h"
@@ -163,13 +164,8 @@ void Queue::deliver(boost::intrusive_ptr
         //drop message
         QPID_LOG(info, "Dropping excluded message from " << getName());
     } else {
-        // if no store then mark as enqueued
-        if (!enqueue(0, msg)){
-            push(msg);
-            msg->enqueueComplete();
-        }else {
-            push(msg);
-        }
+        enqueue(0, msg);
+        push(msg);
         QPID_LOG(debug, "Message " << msg << " enqueued on " << name);
     }
 }
@@ -546,7 +542,7 @@ void Queue::push(boost::intrusive_ptr<Me
 
 void isEnqueueComplete(uint32_t* result, const QueuedMessage& message)
 {
-    if (message.payload->isEnqueueComplete()) (*result)++;
+    if (message.payload->isIngressComplete()) (*result)++;
 }
 
 /** function only provided for unit tests, or code not in critical message path */
@@ -819,11 +815,14 @@ void Queue::configure(const FieldTable& 
     if (autoDeleteTimeout) 
         QPID_LOG(debug, "Configured queue " << getName() << " with qpid.auto_delete_timeout=" << autoDeleteTimeout); 
 
-    if (mgmtObject != 0)
+    if (mgmtObject != 0) {
         mgmtObject->set_arguments(ManagementAgent::toMap(_settings));
+    }
 
     if ( isDurable() && ! getPersistenceId() && ! recovering )
       store->create(*this, _settings);
+
+    QueueFlowLimit::observe(*this, _settings);
 }
 
 void Queue::destroyed()
@@ -1176,6 +1175,7 @@ void Queue::flush()
     if (u.acquired && store) store->flush(*this);
 }
 
+
 bool Queue::bind(boost::shared_ptr<Exchange> exchange, const std::string& key,
                  const qpid::framing::FieldTable& arguments)
 {
@@ -1190,6 +1190,13 @@ bool Queue::bind(boost::shared_ptr<Excha
     }
 }
 
+
+const Broker* Queue::getBroker()
+{
+    return broker;
+}
+
+
 Queue::UsageBarrier::UsageBarrier(Queue& q) : parent(q), count(0) {}
 
 bool Queue::UsageBarrier::acquire()

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=1072356&r1=1072355&r2=1072356&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Sat Feb 19 15:03:16 2011
@@ -363,6 +363,8 @@ class Queue : public boost::enable_share
     void recoverPrepared(boost::intrusive_ptr<Message>& msg);
 
     void flush();
+
+    const Broker* getBroker();
 };
 }
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=1072356&r1=1072355&r2=1072356&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Sat Feb 19 15:03:16 2011
@@ -24,6 +24,7 @@
 #include "qpid/log/Statement.h"
 #include "qpid/framing/SequenceSet.h"
 #include "qpid/management/ManagementAgent.h"
+#include "qpid/broker/SessionState.h"
 #include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
 #include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
 #include "qmf/org/apache/qpid/broker/EventQueueDeclare.h"
@@ -509,7 +510,12 @@ framing::MessageResumeResult SessionAdap
     
 
 
-void SessionAdapter::ExecutionHandlerImpl::sync() {} //essentially a no-op
+void SessionAdapter::ExecutionHandlerImpl::sync()
+{
+    session.addPendingExecutionSync();
+    /** @todo KAG - need a generic mechanism to allow a command to returning "not completed" status back to SessionState */
+
+}
 
 void SessionAdapter::ExecutionHandlerImpl::result(const SequenceNumber& /*commandId*/, const string& /*value*/)
 {

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h?rev=1072356&r1=1072355&r2=1072356&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h Sat Feb 19 15:03:16 2011
@@ -46,6 +46,7 @@ class SessionContext : public OwnershipT
     virtual Broker& getBroker() = 0;
     virtual uint16_t getChannel() const = 0;
     virtual const SessionId& getSessionId() const = 0;
+    virtual void addPendingExecutionSync() = 0;
 };
 
 }} // namespace qpid::broker

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=1072356&r1=1072355&r2=1072356&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Sat Feb 19 15:03:16 2011
@@ -60,9 +60,9 @@ SessionState::SessionState(
       semanticState(*this, *this),
       adapter(semanticState),
       msgBuilder(&broker.getStore()),
-      enqueuedOp(boost::bind(&SessionState::enqueued, this, _1)),
       mgmtObject(0),
-      rateFlowcontrol(0)
+      rateFlowcontrol(0),
+      scheduledCompleterContext(new ScheduledCompleterContext(this))
 {
     uint32_t maxRate = broker.getOptions().maxSessionRate;
     if (maxRate) {
@@ -101,6 +101,26 @@ SessionState::~SessionState() {
 
     if (flowControlTimer)
         flowControlTimer->cancel();
+
+    // clean up any outstanding incomplete commands
+    {
+        qpid::sys::ScopedLock<Mutex> l(incompleteCmdsLock);
+        std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> > copy(incompleteCmds);
+        incompleteCmds.clear();
+        while (!copy.empty()) {
+            boost::shared_ptr<IncompleteCommandContext> ref(copy.begin()->second);
+            copy.erase(copy.begin());
+            {
+                // note: need to drop lock, as callback may attempt to take it.
+                qpid::sys::ScopedUnlock<Mutex> ul(incompleteCmdsLock);
+                ref->cancel();
+            }
+        }
+    }
+
+    // At this point, we are guaranteed no further completion callbacks will be
+    // made.  Cancel any outstanding scheduledCompleter calls...
+    scheduledCompleterContext->cancel();
 }
 
 AMQP_ClientProxy& SessionState::getProxy() {
@@ -202,15 +222,17 @@ Manageable::status_t SessionState::Manag
 }
 
 void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceNumber& id) {
+    currentCommandComplete = true;      // assumed, can be overridden by invoker method (this sucks).
     Invoker::Result invocation = invoke(adapter, *method);
-    receiverCompleted(id);
+    if (currentCommandComplete) receiverCompleted(id);
+
     if (!invocation.wasHandled()) {
         throw NotImplementedException(QPID_MSG("Not implemented: " << *method));
     } else if (invocation.hasResult()) {
         getProxy().getExecution().result(id, invocation.getResult());
     }
-    if (method->isSync()) {
-        incomplete.process(enqueuedOp, true);
+
+    if (method->isSync() && currentCommandComplete) {
         sendAcceptAndCompletion();
     }
 }
@@ -254,22 +276,13 @@ void SessionState::handleContent(AMQFram
             msg->getFrames().append(header);
         }
         msg->setPublisher(&getConnection());
+
+        boost::shared_ptr<AsyncCompletion> ac(boost::dynamic_pointer_cast<AsyncCompletion>(createIngressMsgXferContext(msg)));
+        msg->setIngressCompletion( ac );
+        ac->begin();
         semanticState.handle(msg);
         msgBuilder.end();
-
-        if (msg->isEnqueueComplete()) {
-            enqueued(msg);
-        } else {
-            incomplete.add(msg);
-        }
-
-        //hold up execution until async enqueue is complete
-        if (msg->getFrames().getMethod()->isSync()) {
-            incomplete.process(enqueuedOp, true);
-            sendAcceptAndCompletion();
-        } else {
-            incomplete.process(enqueuedOp, false);
-        }
+        ac->end();  // allows msg to complete xfer
     }
 
     // Handle producer session flow control
@@ -319,11 +332,38 @@ void SessionState::sendAcceptAndCompleti
     sendCompletion();
 }
 
-void SessionState::enqueued(boost::intrusive_ptr<Message> msg)
+/** Invoked when the given inbound message is finished being processed
+ * by all interested parties (eg. it is done being enqueued to all queues,
+ * its credit has been accounted for, etc).  At this point, msg is considered
+ * by this receiver as 'completed' (as defined by AMQP 0_10)
+ */
+void SessionState::completeRcvMsg(SequenceNumber id,
+                                  bool requiresAccept,
+                                  bool requiresSync)
 {
-    receiverCompleted(msg->getCommandId());
-    if (msg->requiresAccept())
-        accepted.add(msg->getCommandId());
+    bool callSendCompletion = false;
+    receiverCompleted(id);
+    if (requiresAccept)
+        // will cause msg's seq to appear in the next message.accept we send.
+        accepted.add(id);
+
+    // Are there any outstanding Execution.Sync commands pending the
+    // completion of this msg?  If so, complete them.
+    while (!pendingExecutionSyncs.empty() &&
+           receiverGetIncomplete().front() >= pendingExecutionSyncs.front()) {
+        const SequenceNumber id = pendingExecutionSyncs.front();
+        pendingExecutionSyncs.pop();
+        QPID_LOG(debug, getId() << ": delayed execution.sync " << id << " is completed.");
+        receiverCompleted(id);
+        callSendCompletion = true;   // likely peer is pending for this completion.
+    }
+
+    // if the sender has requested immediate notification of the completion...
+    if (requiresSync) {
+        sendAcceptAndCompletion();
+    } else if (callSendCompletion) {
+        sendCompletion();
+    }
 }
 
 void SessionState::handleIn(AMQFrame& frame) {
@@ -396,4 +436,126 @@ framing::AMQP_ClientProxy& SessionState:
     return handler->getClusterOrderProxy();
 }
 
+
+// Current received command is an execution.sync command.
+// Complete this command only when all preceding commands have completed.
+// (called via the invoker() in handleCommand() above)
+void SessionState::addPendingExecutionSync()
+{
+    SequenceNumber syncCommandId = receiverGetCurrent();
+    if (receiverGetIncomplete().front() < syncCommandId) {
+        currentCommandComplete = false;
+        pendingExecutionSyncs.push(syncCommandId);
+        QPID_LOG(debug, getId() << ": delaying completion of execution.sync " << syncCommandId);
+    }
+}
+
+
+/** factory for creating IncompleteIngressMsgXfer objects which
+ * can be references from Messages as ingress AsyncCompletion objects.
+ */
+boost::shared_ptr<SessionState::IncompleteIngressMsgXfer>
+SessionState::createIngressMsgXferContext(boost::intrusive_ptr<Message> msg)
+{
+    SequenceNumber id = msg->getCommandId();
+    boost::shared_ptr<SessionState::IncompleteIngressMsgXfer> cmd(new SessionState::IncompleteIngressMsgXfer(this, id, msg));
+    qpid::sys::ScopedLock<Mutex> l(incompleteCmdsLock);
+    incompleteCmds[id] = cmd;
+    return cmd;
+}
+
+
+/** Invoked by the asynchronous completer associated with
+ * a received msg that is pending Completion.  May be invoked
+ * by the SessionState directly (sync == true), or some external
+ * entity (!sync).
+ */
+void SessionState::IncompleteIngressMsgXfer::completed(bool sync)
+{
+    if (!sync) {
+        /** note well: this path may execute in any thread.  It is safe to access
+         * the session, as the SessionState destructor will cancel all outstanding
+         * callbacks before getting destroyed (so we'll never get here).
+         */
+        QPID_LOG(debug, ": async completion callback scheduled for msg seq=" << id);
+        if (session->scheduledCompleterContext->scheduleCompletion(id))
+            session->getConnection().requestIOProcessing(boost::bind(&scheduledCompleter,
+                                                                     session->scheduledCompleterContext));
+    } else {  // command is being completed in IO thread.
+        // this path runs only on the IO thread.
+        qpid::sys::ScopedLock<Mutex> l(session->incompleteCmdsLock);
+        std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> >::iterator cmd;
+        cmd = session->incompleteCmds.find(id);
+        if (cmd != session->incompleteCmds.end()) {
+            boost::shared_ptr<IncompleteCommandContext> tmp(cmd->second);
+            session->incompleteCmds.erase(cmd);
+
+            if (session->isAttached()) {
+                QPID_LOG(debug, ": receive completed for msg seq=" << id);
+                qpid::sys::ScopedUnlock<Mutex> ul(session->incompleteCmdsLock);
+                session->completeRcvMsg(id, requiresAccept, requiresSync);
+                return;
+            }
+        }
+    }
+}
+
+
+/** Scheduled from incomplete command's completed callback, safely completes all
+ * completed commands in the IO Thread.  Guaranteed not to be running at the same
+ * time as the message receive code.
+ */
+void SessionState::scheduledCompleter(boost::shared_ptr<SessionState::ScheduledCompleterContext> ctxt)
+{
+    ctxt->completeCommands();
+}
+
+
+/** mark a command (sequence) as completed, return True if caller should
+ * schedule a call to completeCommands()
+ */
+bool SessionState::ScheduledCompleterContext::scheduleCompletion(SequenceNumber cmd)
+{
+    qpid::sys::ScopedLock<qpid::sys::Mutex> l(completedCmdsLock);
+
+    completedCmds.push_back(cmd);
+    return (completedCmds.size() == 1);
+}
+
+
+/** Cause the session to complete all completed commands */
+void SessionState::ScheduledCompleterContext::completeCommands()
+{
+    qpid::sys::ScopedLock<qpid::sys::Mutex> l(completedCmdsLock);
+
+    // when session is destroyed, it clears the session pointer via cancel().
+    if (!session) return;
+
+    while (!completedCmds.empty()) {
+        SequenceNumber id = completedCmds.front();
+        completedCmds.pop_front();
+        std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> >::iterator cmd;
+        {
+            qpid::sys::ScopedLock<qpid::sys::Mutex> l(session->incompleteCmdsLock);
+
+            cmd = session->incompleteCmds.find(id);
+            if (cmd !=session->incompleteCmds.end()) {
+                boost::shared_ptr<IncompleteCommandContext> tmp(cmd->second);
+                {
+                    qpid::sys::ScopedUnlock<qpid::sys::Mutex> ul(session->incompleteCmdsLock);
+                    tmp->do_completion();   // retakes incompleteCmdslock
+                }
+            }
+        }
+    }
+}
+
+
+/** cancel any pending calls to scheduleComplete */
+void SessionState::ScheduledCompleterContext::cancel()
+{
+    qpid::sys::ScopedLock<qpid::sys::Mutex> l(completedCmdsLock);
+    session = 0;
+}
+
 }} // namespace qpid::broker

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=1072356&r1=1072355&r2=1072356&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Sat Feb 19 15:03:16 2011
@@ -30,10 +30,11 @@
 #include "qmf/org/apache/qpid/broker/Session.h"
 #include "qpid/broker/SessionAdapter.h"
 #include "qpid/broker/DeliveryAdapter.h"
-#include "qpid/broker/IncompleteMessageList.h"
+#include "qpid/broker/AsyncCompletion.h"
 #include "qpid/broker/MessageBuilder.h"
 #include "qpid/broker/SessionContext.h"
 #include "qpid/broker/SemanticState.h"
+#include "qpid/sys/Monitor.h"
 
 #include <boost/noncopyable.hpp>
 #include <boost/scoped_ptr.hpp>
@@ -123,6 +124,10 @@ class SessionState : public qpid::Sessio
 
     const SessionId& getSessionId() const { return getId(); }
 
+    // Used by ExecutionHandler sync command processing.  Notifies
+    // the SessionState of a received Execution.Sync command.
+    void addPendingExecutionSync();
+
     // Used to delay creation of management object for sessions
     // belonging to inter-broker bridges
     void addManagementObject();
@@ -130,7 +135,10 @@ class SessionState : public qpid::Sessio
   private:
     void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber& id);
     void handleContent(framing::AMQFrame& frame, const framing::SequenceNumber& id);
-    void enqueued(boost::intrusive_ptr<Message> msg);
+
+    // indicate that the given ingress msg has been completely received by the
+    // broker, and the msg's message.transfer command can be considered completed.
+    void completeRcvMsg(SequenceNumber id, bool requiresAccept, bool requiresSync);
 
     void handleIn(framing::AMQFrame& frame);
     void handleOut(framing::AMQFrame& frame);
@@ -156,8 +164,6 @@ class SessionState : public qpid::Sessio
     SemanticState semanticState;
     SessionAdapter adapter;
     MessageBuilder msgBuilder;
-    IncompleteMessageList incomplete;
-    IncompleteMessageList::CompletionListener enqueuedOp;
     qmf::org::apache::qpid::broker::Session* mgmtObject;
     qpid::framing::SequenceSet accepted;
 
@@ -166,6 +172,84 @@ class SessionState : public qpid::Sessio
     boost::scoped_ptr<RateFlowcontrol> rateFlowcontrol;
     boost::intrusive_ptr<sys::TimerTask> flowControlTimer;
 
+    // sequence numbers for pending received Execution.Sync commands
+    std::queue<SequenceNumber> pendingExecutionSyncs;
+    bool currentCommandComplete;
+
+    /** Abstract class that represents a command that is pending
+     * completion.
+     */
+    class IncompleteCommandContext : public AsyncCompletion
+    {
+     public:
+        IncompleteCommandContext( SessionState *ss, SequenceNumber _id )
+          : id(_id), session(ss) {}
+        virtual ~IncompleteCommandContext() {}
+
+        /* allows manual invokation of completion, used by IO thread to
+         * complete a command that was originally finished on a different
+         * thread.
+         */
+        void do_completion() { completed(true); }
+
+     protected:
+        SequenceNumber id;
+        SessionState    *session;
+    };
+
+    /** incomplete Message.transfer commands - inbound to broker from client
+     */
+    class IncompleteIngressMsgXfer : public SessionState::IncompleteCommandContext
+    {
+     public:
+        IncompleteIngressMsgXfer( SessionState *ss,
+                                  SequenceNumber _id,
+                                  boost::intrusive_ptr<Message> msg )
+          : IncompleteCommandContext(ss, _id),
+          requiresAccept(msg->requiresAccept()),
+          requiresSync(msg->getFrames().getMethod()->isSync()) {};
+        virtual ~IncompleteIngressMsgXfer() {};
+
+     protected:
+        virtual void completed(bool);
+
+     private:
+        /** meta-info required to complete the message */
+        bool requiresAccept;
+        bool requiresSync;  // method's isSync() flag
+    };
+    /** creates a command context suitable for use as an AsyncCompletion in a message */
+    boost::shared_ptr<SessionState::IncompleteIngressMsgXfer> createIngressMsgXferContext( boost::intrusive_ptr<Message> msg);
+
+    /* A list of commands that are pending completion.  These commands are
+     * awaiting some set of asynchronous operations to finish (eg: store,
+     * flow-control, etc). before the command can be completed to the client
+     */
+    std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> > incompleteCmds;
+    qpid::sys::Mutex incompleteCmdsLock;  // locks above container
+
+    /** This context is shared between the SessionState and scheduledCompleter,
+     * holds the sequence numbers of all commands that have completed asynchronously.
+     */
+    class ScheduledCompleterContext {
+    private:
+        std::list<SequenceNumber> completedCmds;
+        // ordering: take this lock first, then incompleteCmdsLock
+        qpid::sys::Mutex completedCmdsLock;
+        SessionState *session;
+    public:
+        ScheduledCompleterContext(SessionState *s) : session(s) {};
+        bool scheduleCompletion(SequenceNumber cmd);
+        void completeCommands();
+        void cancel();
+    };
+    boost::shared_ptr<ScheduledCompleterContext> scheduledCompleterContext;
+
+    /** The following method runs the in IO thread and completes commands that
+     * where finished asynchronously.
+     */
+    static void scheduledCompleter(boost::shared_ptr<ScheduledCompleterContext>);
+
     friend class SessionManager;
 };
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=1072356&r1=1072355&r2=1072356&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Sat Feb 19 15:03:16 2011
@@ -36,28 +36,28 @@
  *
  * IMPORTANT NOTE: any time code is added to the broker that uses timers,
  * the cluster may need to be updated to take account of this.
- * 
+ *
  *
  * USE OF TIMESTAMPS IN THE BROKER
- *  
+ *
  * The following are the current areas where broker uses timers or timestamps:
- * 
+ *
  * - Producer flow control: broker::SemanticState uses
  *   connection::getClusterOrderOutput.  a FrameHandler that sends
  *   frames to the client via the cluster. Used by broker::SessionState
- *   
+ *
  * - QueueCleaner, Message TTL: uses ExpiryPolicy, which is
  *   implemented by cluster::ExpiryPolicy.
- * 
+ *
  * - Connection heartbeat: sends connection controls, not part of
  *   session command counting so OK to ignore.
- * 
+ *
  * - LinkRegistry: only cluster elder is ever active for links.
- * 
+ *
  * - management::ManagementBroker: uses MessageHandler supplied by  cluster
  *   to send messages to the broker via the cluster.
- *   
- * - Dtx: not yet supported with cluster.  
+ *
+ * - Dtx: not yet supported with cluster.
  *
  * cluster::ExpiryPolicy implements the strategy for message expiry.
  *
@@ -65,16 +65,16 @@
  * Used for periodic management events.
  *
  * <h1>CLUSTER PROTOCOL OVERVIEW</h1>
- * 
+ *
  * Messages sent to/from CPG are called Events.
  *
  * An Event carries a ConnectionId, which includes a MemberId and a
  * connection number.
- * 
+ *
  * Events are either
  *  - Connection events: non-0 connection number and are associated with a connection.
  *  - Cluster Events: 0 connection number, are not associated with a connection.
- * 
+ *
  * Events are further categorized as:
  *  - Control: carries method frame(s) that affect cluster behavior.
  *  - Data: carries raw data received from a client connection.
@@ -214,7 +214,7 @@ struct ClusterDispatcher : public framin
     {
         cluster.initialStatus(
             member, version, active, clusterId,
-            framing::cluster::StoreState(storeState), shutdownId, 
+            framing::cluster::StoreState(storeState), shutdownId,
             firstConfig, l);
     }
     void ready(const std::string& url) {
@@ -244,7 +244,7 @@ struct ClusterDispatcher : public framin
 };
 
 Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
-    settings(set), 
+    settings(set),
     broker(b),
     mgmtObject(0),
     poller(b.getPoller()),
@@ -279,6 +279,8 @@ Cluster::Cluster(const ClusterSettings& 
     updateClosed(false),
     error(*this)
 {
+    broker.setInCluster(true);
+
     // We give ownership of the timer to the broker and keep a plain pointer.
     // This is OK as it means the timer has the same lifetime as the broker.
     timer = new ClusterTimer(*this);
@@ -299,7 +301,7 @@ Cluster::Cluster(const ClusterSettings& 
     // Load my store status before we go into initialization
     if (! broker::NullMessageStore::isNullStore(&broker.getStore())) {
         store.load();
-        clusterId = store.getClusterId(); 
+        clusterId = store.getClusterId();
         QPID_LOG(notice, "Cluster store state: " << store)
             }
     cpg.join(name);
@@ -360,14 +362,14 @@ void Cluster::addShadowConnection(const 
     // Safe to use connections here because we're pre-catchup, stalled
     // and discarding, so deliveredFrame is not processing any
     // connection events.
-    assert(discarding);         
+    assert(discarding);
     pair<ConnectionMap::iterator, bool> ib
         = connections.insert(ConnectionMap::value_type(c->getId(), c));
     assert(ib.second);
 }
 
 void Cluster::erase(const ConnectionId& id) {
-    Lock l(lock);    
+    Lock l(lock);
     erase(id,l);
 }
 
@@ -393,9 +395,9 @@ std::vector<Url> Cluster::getUrls() cons
 
 std::vector<Url> Cluster::getUrls(Lock&) const {
     return map.memberUrls();
-} 
+}
 
-void Cluster::leave() { 
+void Cluster::leave() {
     Lock l(lock);
     leave(l);
 }
@@ -405,7 +407,7 @@ void Cluster::leave() { 
         QPID_LOG(warning, *this << " error leaving cluster: " << e.what()); \
     } do {} while(0)
 
-void Cluster::leave(Lock&) { 
+void Cluster::leave(Lock&) {
     if (state != LEFT) {
         state = LEFT;
         QPID_LOG(notice, *this << " leaving cluster " << name);
@@ -424,7 +426,7 @@ void Cluster::deliver(
     uint32_t nodeid,
     uint32_t pid,
     void* msg,
-    int msg_len) 
+    int msg_len)
 {
     MemberId from(nodeid, pid);
     framing::Buffer buf(static_cast<char*>(msg), msg_len);
@@ -455,7 +457,7 @@ void Cluster::deliveredEvent(const Event
         EventFrame ef(e, e.getFrame());
         // Stop the deliverEventQueue on update offers.
         // This preserves the connection decoder fragments for an update.
-        // Only do this for the two brokers that are directly involved in this 
+        // Only do this for the two brokers that are directly involved in this
         // offer: the one making the offer, or the one receiving it.
         const ClusterUpdateOfferBody* offer = castUpdateOffer(ef.frame.getBody());
         if (offer && ( e.getMemberId() == self || MemberId(offer->getUpdatee()) == self) ) {
@@ -465,7 +467,7 @@ void Cluster::deliveredEvent(const Event
         }
         deliverFrame(ef);
     }
-    else if(!discarding) { 
+    else if(!discarding) {
         if (e.isControl())
             deliverFrame(EventFrame(e, e.getFrame()));
         else {
@@ -507,7 +509,7 @@ void Cluster::deliveredFrame(const Event
         // the event queue.
         e.frame = AMQFrame(
             ClusterRetractOfferBody(ProtocolVersion(), offer->getUpdatee()));
-        deliverEventQueue.start(); 
+        deliverEventQueue.start();
     }
     // Process each frame through the error checker.
     if (error.isUnresolved()) {
@@ -515,7 +517,7 @@ void Cluster::deliveredFrame(const Event
         while (error.canProcess())  // There is a frame ready to process.
             processFrame(error.getNext(), l);
     }
-    else 
+    else
         processFrame(e, l);
 }
 
@@ -577,7 +579,7 @@ Cluster::ConnectionVector Cluster::getCo
 }
 
 // CPG config-change callback.
-void Cluster::configChange ( 
+void Cluster::configChange (
     cpg_handle_t /*handle*/,
     const cpg_name */*group*/,
     const cpg_address *members, int nMembers,
@@ -607,7 +609,7 @@ void Cluster::setReady(Lock&) {
 }
 
 // Set the management status from the Cluster::state.
-// 
+//
 // NOTE: Management updates are sent based on property changes.  In
 // order to keep consistency across the cluster, we touch the local
 // management status property even if it is locally unchanged for any
@@ -618,7 +620,7 @@ void Cluster::setMgmtStatus(Lock&) {
 }
 
 void Cluster::initMapCompleted(Lock& l) {
-    // Called on completion of the initial status map.    
+    // Called on completion of the initial status map.
     QPID_LOG(debug, *this << " initial status map complete. ");
     setMgmtStatus(l);
     if (state == PRE_INIT) {
@@ -701,8 +703,8 @@ void Cluster::configChange(const MemberI
     if (initMap.isResendNeeded()) {
         mcast.mcastControl(
             ClusterInitialStatusBody(
-                ProtocolVersion(), CLUSTER_VERSION, state > INIT, clusterId, 
-                store.getState(), store.getShutdownId(), 
+                ProtocolVersion(), CLUSTER_VERSION, state > INIT, clusterId,
+                store.getState(), store.getShutdownId(),
                 initMap.getFirstConfigStr()
             ),
             self);
@@ -759,7 +761,7 @@ std::string Cluster::debugSnapshot() {
 // point we know the poller has stopped so no poller callbacks will be
 // invoked. We must ensure that CPG has also shut down so no CPG
 // callbacks will be invoked.
-// 
+//
 void Cluster::brokerShutdown()  {
     sys::ClusterSafeScope css; // Don't trigger cluster-safe asserts.
     try { cpg.shutdown(); }
@@ -775,7 +777,7 @@ void Cluster::updateRequest(const Member
 }
 
 void Cluster::initialStatus(const MemberId& member, uint32_t version, bool active,
-                            const framing::Uuid& id, 
+                            const framing::Uuid& id,
                             framing::cluster::StoreState store,
                             const framing::Uuid& shutdownId,
                             const std::string& firstConfig,
@@ -969,7 +971,7 @@ void Cluster::updateOutDone(Lock& l) {
 
 void Cluster::updateOutError(const std::exception& e)  {
     Monitor::ScopedLock l(lock);
-    QPID_LOG(error, *this << " error sending update: " << e.what());    
+    QPID_LOG(error, *this << " error sending update: " << e.what());
     updateOutDone(l);
 }
 
@@ -1067,7 +1069,7 @@ void Cluster::memberUpdate(Lock& l) {
 void Cluster::updateMgmtMembership(Lock& l) {
     if (!mgmtObject) return;
     std::vector<Url> urls = getUrls(l);
-    mgmtObject->set_clusterSize(urls.size()); 
+    mgmtObject->set_clusterSize(urls.size());
     string urlstr;
     for(std::vector<Url>::iterator i = urls.begin(); i != urls.end(); i++ ) {
         if (i != urls.begin()) urlstr += ";";

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h?rev=1072356&r1=1072355&r2=1072356&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h Sat Feb 19 15:03:16 2011
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -53,7 +53,7 @@ class EventHeader {
 
     /** Size of payload data, excluding header. */
     size_t getSize() const { return size; }
-    /** Size of header + payload. */ 
+    /** Size of header + payload. */
     size_t getStoreSize() const { return size + HEADER_SIZE; }
 
     bool isCluster() const { return connectionId.getNumber() == 0; }
@@ -62,7 +62,7 @@ class EventHeader {
 
   protected:
     static const size_t HEADER_SIZE;
-    
+
     EventType type;
     ConnectionId connectionId;
     size_t size;
@@ -86,7 +86,7 @@ class Event : public EventHeader {
 
     /** Create a control event. */
     static Event control(const framing::AMQFrame&, const ConnectionId&);
-    
+
     // Data excluding header.
     char* getData() { return store + HEADER_SIZE; }
     const char* getData() const { return store + HEADER_SIZE; }
@@ -95,12 +95,12 @@ class Event : public EventHeader {
     char* getStore() { return store; }
     const char* getStore() const { return store; }
 
-    const framing::AMQFrame& getFrame() const;        
-    
+    const framing::AMQFrame& getFrame() const;
+
     operator framing::Buffer() const;
 
     iovec toIovec() const;
-    
+
   private:
     void encodeHeader() const;
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h?rev=1072356&r1=1072355&r2=1072356&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h Sat Feb 19 15:03:16 2011
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -48,7 +48,7 @@ struct EventFrame
 
 
     ConnectionId connectionId;
-    framing::AMQFrame frame;   
+    framing::AMQFrame frame;
     int readCredit; ///< last frame in an event, give credit when processed.
     EventType type;
 };

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 19 15:03:16 2011
@@ -0,0 +1 @@
+/qpid/branches/qpid-2935/qpid/cpp/src/qpid/cluster/UpdateClient.cpp:1061302-1072333

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 19 15:03:16 2011
@@ -0,0 +1 @@
+/qpid/branches/qpid-2935/qpid/cpp/src/qpid/cluster/UpdateClient.h:1061302-1072333

Propchange: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 19 15:03:16 2011
@@ -0,0 +1 @@
+/qpid/branches/qpid-2935/qpid/cpp/src/qpid/management/ManagementAgent.cpp:1061302-1072333

Propchange: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 19 15:03:16 2011
@@ -0,0 +1 @@
+/qpid/branches/qpid-2935/qpid/cpp/src/qpid/management/ManagementAgent.h:1061302-1072333

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/ClusterSafe.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/ClusterSafe.cpp?rev=1072356&r1=1072355&r2=1072356&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/ClusterSafe.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/ClusterSafe.cpp Sat Feb 19 15:03:16 2011
@@ -34,8 +34,6 @@ QPID_TSS bool inContext = false;
 
 bool isClusterSafe() { return !inCluster || inContext; }
 
-bool isCluster() { return inCluster; }
-
 void assertClusterSafe()  {
     if (!isClusterSafe()) {
         QPID_LOG(critical, "Modified cluster state outside of cluster context");

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/ClusterSafe.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/ClusterSafe.h?rev=1072356&r1=1072355&r2=1072356&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/ClusterSafe.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/ClusterSafe.h Sat Feb 19 15:03:16 2011
@@ -52,9 +52,6 @@ QPID_COMMON_EXTERN void assertClusterSaf
  */
 QPID_COMMON_EXTERN bool isClusterSafe();
 
-/** Return true in a clustered broker */
-QPID_COMMON_EXTERN bool isCluster();
-
 /**
  * Base class for classes that encapsulate state which is replicated
  * to all members of a cluster. Acts as a marker for clustered state

Modified: qpid/trunk/qpid/cpp/src/tests/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/CMakeLists.txt?rev=1072356&r1=1072355&r2=1072356&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/CMakeLists.txt (original)
+++ qpid/trunk/qpid/cpp/src/tests/CMakeLists.txt Sat Feb 19 15:03:16 2011
@@ -107,7 +107,6 @@ set(unit_tests_to_build
     MessagingSessionTests
     SequenceSet
     StringUtils
-    IncompleteMessageList
     RangeSet
     AtomicValue
     QueueTest
@@ -119,6 +118,7 @@ set(unit_tests_to_build
     MessageTest
     QueueRegistryTest
     QueuePolicyTest
+    QueueFlowLimitTest
     FramingTest
     HeaderTest
     SequenceNumberTest

Modified: qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=1072356&r1=1072355&r2=1072356&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/tests/Makefile.am Sat Feb 19 15:03:16 2011
@@ -87,7 +87,6 @@ unit_test_SOURCES= unit_test.cpp unit_te
 	InlineVector.cpp \
 	SequenceSet.cpp \
 	StringUtils.cpp \
-	IncompleteMessageList.cpp \
 	RangeSet.cpp \
 	AtomicValue.cpp \
 	QueueTest.cpp \
@@ -99,6 +98,7 @@ unit_test_SOURCES= unit_test.cpp unit_te
 	MessageTest.cpp \
 	QueueRegistryTest.cpp \
 	QueuePolicyTest.cpp \
+	QueueFlowLimitTest.cpp \
 	FramingTest.cpp \
 	HeaderTest.cpp \
 	SequenceNumberTest.cpp \
@@ -310,7 +310,9 @@ TESTS_ENVIRONMENT = \
     $(srcdir)/run_test 
 
 system_tests = qpid-client-test quick_perftest quick_topictest run_header_test quick_txtest
-TESTS += start_broker $(system_tests) python_tests stop_broker run_federation_tests run_acl_tests run_cli_tests replication_test dynamic_log_level_test
+TESTS += start_broker $(system_tests) python_tests stop_broker run_federation_tests \
+  run_acl_tests run_cli_tests replication_test dynamic_log_level_test \
+  run_queue_flow_limit_tests
 
 EXTRA_DIST +=								\
   run_test vg_check							\
@@ -349,7 +351,8 @@ EXTRA_DIST +=								\
   run_test.ps1								\
   start_broker.ps1							\
   stop_broker.ps1							\
-  topictest.ps1
+  topictest.ps1                                                         \
+  run_queue_flow_limit_tests
 
 check_LTLIBRARIES += libdlclose_noop.la
 libdlclose_noop_la_LDFLAGS = -module -rpath $(abs_builddir)

Modified: qpid/trunk/qpid/cpp/src/tests/MessageUtils.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/MessageUtils.h?rev=1072356&r1=1072355&r2=1072356&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/MessageUtils.h (original)
+++ qpid/trunk/qpid/cpp/src/tests/MessageUtils.h Sat Feb 19 15:03:16 2011
@@ -20,6 +20,7 @@
  */
 
 #include "qpid/broker/Message.h"
+#include "qpid/broker/AsyncCompletion.h"
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/framing/MessageTransferBody.h"
 #include "qpid/framing/Uuid.h"
@@ -28,6 +29,17 @@ using namespace qpid;
 using namespace broker;
 using namespace framing;
 
+namespace {
+    class DummyCompletion : public AsyncCompletion
+    {
+  public:
+        DummyCompletion() {}
+        virtual ~DummyCompletion() {}
+  protected:
+        void completed(bool) {}
+    };
+}
+
 namespace qpid {
 namespace tests {
 
@@ -50,6 +62,8 @@ struct MessageUtils
         msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey);
         if (durable)
             msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setDeliveryMode(2);
+        boost::shared_ptr<AsyncCompletion>dc(new DummyCompletion());
+        msg->setIngressCompletion(dc);
         return msg;
     }
 

Modified: qpid/trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp?rev=1072356&r1=1072355&r2=1072356&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp Sat Feb 19 15:03:16 2011
@@ -23,6 +23,7 @@
 #include "test_tools.h"
 
 #include "qpid/broker/QueuePolicy.h"
+#include "qpid/broker/QueueFlowLimit.h"
 #include "qpid/client/QueueOptions.h"
 #include "qpid/sys/Time.h"
 #include "qpid/framing/reply_exceptions.h"
@@ -38,6 +39,7 @@ namespace tests {
 
 QPID_AUTO_TEST_SUITE(QueuePolicyTestSuite)
 
+namespace {
 QueuedMessage createMessage(uint32_t size)
 {
     QueuedMessage msg;
@@ -45,7 +47,7 @@ QueuedMessage createMessage(uint32_t siz
     MessageUtils::addContent(msg.payload, std::string (size, 'x'));
     return msg;
 }
-
+}
 
 QPID_AUTO_TEST_CASE(testCount)
 {
@@ -340,6 +342,8 @@ QPID_AUTO_TEST_CASE(testFlowToDiskWithNo
     //fallback to rejecting messages
     QueueOptions args;
     args.setSizePolicy(FLOW_TO_DISK, 0, 5);
+    // Disable flow control, or else we'll never hit the max limit
+    args.setInt(QueueFlowLimit::flowStopCountKey, 0);
 
     ProxySessionFixture f;
     std::string q("my-queue");

Modified: qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp?rev=1072356&r1=1072355&r2=1072356&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp Sat Feb 19 15:03:16 2011
@@ -36,6 +36,9 @@
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/framing/MessageTransferBody.h"
 #include "qpid/framing/reply_exceptions.h"
+#include "qpid/broker/QueuePolicy.h"
+#include "qpid/broker/QueueFlowLimit.h"
+
 #include <iostream>
 #include "boost/format.hpp"
 
@@ -85,6 +88,8 @@ intrusive_ptr<Message> create_message(st
     msg->getFrames().append(method);
     msg->getFrames().append(header);
     msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey);
+    boost::shared_ptr<AsyncCompletion>dc(new DummyCompletion());
+    msg->setIngressCompletion(dc);
     return msg;
 }
 
@@ -508,6 +513,8 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){
     client::QueueOptions args;
     // set queue mode
     args.setOrdering(client::LVQ);
+    // disable flow control, as this test violates the enqueue/dequeue sequence.
+    args.setInt(QueueFlowLimit::flowStopCountKey, 0);
 
     Queue::shared_ptr queue(new Queue("my-queue", true ));
     queue->configure(args);

Modified: qpid/trunk/qpid/cpp/src/tests/TxPublishTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/TxPublishTest.cpp?rev=1072356&r1=1072355&r2=1072356&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/TxPublishTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/TxPublishTest.cpp Sat Feb 19 15:03:16 2011
@@ -74,7 +74,7 @@ QPID_AUTO_TEST_CASE(testPrepare)
     BOOST_CHECK_EQUAL(pmsg, t.store.enqueued[0].second);
     BOOST_CHECK_EQUAL(string("queue2"), t.store.enqueued[1].first);
     BOOST_CHECK_EQUAL(pmsg, t.store.enqueued[1].second);
-    BOOST_CHECK_EQUAL( true, ( boost::static_pointer_cast<PersistableMessage>(t.msg))->isEnqueueComplete());
+    BOOST_CHECK_EQUAL( true, ( boost::static_pointer_cast<PersistableMessage>(t.msg))->isIngressComplete());
 }
 
 QPID_AUTO_TEST_CASE(testCommit)
@@ -87,7 +87,7 @@ QPID_AUTO_TEST_CASE(testCommit)
     BOOST_CHECK_EQUAL((uint32_t) 1, t.queue1->getMessageCount());
     intrusive_ptr<Message> msg_dequeue = t.queue1->get().payload;
 
-    BOOST_CHECK_EQUAL( true, (boost::static_pointer_cast<PersistableMessage>(msg_dequeue))->isEnqueueComplete());
+    BOOST_CHECK_EQUAL( true, (boost::static_pointer_cast<PersistableMessage>(msg_dequeue))->isIngressComplete());
     BOOST_CHECK_EQUAL(t.msg, msg_dequeue);
 
     BOOST_CHECK_EQUAL((uint32_t) 1, t.queue2->getMessageCount());

Modified: qpid/trunk/qpid/cpp/src/tests/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/brokertest.py?rev=1072356&r1=1072355&r2=1072356&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/brokertest.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/brokertest.py Sat Feb 19 15:03:16 2011
@@ -29,6 +29,7 @@ from unittest import TestCase
 from copy import copy
 from threading import Thread, Lock, Condition
 from logging import getLogger
+import qmf.console
 
 log = getLogger("qpid.brokertest")
 
@@ -327,6 +328,10 @@ class Broker(Popen):
         log.debug("Started broker %s (%s, %s)" % (self.name, self.pname, self.log))
         self._log_ready = False
 
+    def startQmf(self, handler=None):
+        self.qmf_session = qmf.console.Session(handler)
+        self.qmf_broker = self.qmf_session.addBroker("%s:%s" % (self.host(), self.port()))
+
     def host(self): return self._host
 
     def port(self):

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=1072356&r1=1072355&r2=1072356&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Sat Feb 19 15:03:16 2011
@@ -23,7 +23,7 @@ from qpid import datatypes, messaging
 from brokertest import *
 from qpid.harness import Skipped
 from qpid.messaging import Message, Empty
-from threading import Thread, Lock
+from threading import Thread, Lock, Condition
 from logging import getLogger
 from itertools import chain
 from tempfile import NamedTemporaryFile
@@ -304,6 +304,113 @@ acl allow all all
         # Verify logs are consistent
         cluster_test_logs.verify_logs()
 
+    class BlockedSend(Thread):
+        """Send a message, send is expected to block.
+        Verify that it does block (for a given timeout), then allow
+        waiting till it unblocks when it is expected to do so."""
+        def __init__(self, sender, msg):
+            self.sender, self.msg = sender, msg
+            self.blocked = True
+            self.condition = Condition()
+            self.timeout = 0.1    # Time to wait for expected results.
+            Thread.__init__(self)
+        def run(self):
+            try:
+                self.sender.send(self.msg)
+                self.condition.acquire()
+                try:
+                    self.blocked = False
+                    self.condition.notify()
+                finally: self.condition.release()
+            except Exception,e: print "BlockedSend exception: %s"%e
+        def start(self):
+            Thread.start(self)
+            time.sleep(self.timeout)
+            assert self.blocked         # Expected to block
+        def assert_blocked(self): assert self.blocked
+        def wait(self):                 # Now expecting to unblock
+            self.condition.acquire()
+            try:
+                while self.blocked:
+                    self.condition.wait(self.timeout)
+                    if self.blocked: raise Exception("Timed out waiting for send to unblock")
+            finally: self.condition.release()
+            self.join()
+
+    def queue_flowlimit_test(self, brokers):
+        """Verify that the queue's flowlimit configuration and state are
+        correctly replicated.
+        The brokers argument allows this test to run on single broker,
+        cluster of 2 pre-startd brokers or cluster where second broker
+        starts after queue is in flow control.
+        """
+        # configure a queue with a specific flow limit on first broker
+        ssn0 = brokers.first().connect().session()
+        s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':5, 'qpid.flow_resume_count':3}}}}")
+        brokers.first().startQmf()
+        q = [q for q in brokers.first().qmf_session.getObjects(_class="queue") if q.name == "flq"][0]
+        oid = q.getObjectId()
+        self.assertEqual(q.name, "flq")
+        self.assertEqual(q.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L})
+        assert not q.flowStopped
+
+        # fill the queue on one broker until flow control is active
+        for x in range(5): s0.send(Message(str(x)))
+        sender = ShortTests.BlockedSend(s0, Message(str(6)))
+        sender.start()                  # Tests that sender does block
+        # Verify the broker queue goes into a flowStopped state
+        deadline = time.time() + 1
+        while not q.flowStopped and time.time() < deadline: q.update()
+        assert q.flowStopped
+        sender.assert_blocked()         # Still blocked
+
+        # Now verify the  both brokers in cluster have same configuration
+        brokers.second().startQmf()
+        qs = brokers.second().qmf_session.getObjects(_objectId=oid)
+        self.assertEqual(len(qs), 1)
+        q = qs[0]
+        self.assertEqual(q.name, "flq")
+        self.assertEqual(q.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L})
+        assert q.flowStopped
+
+        # now drain the queue using a session to the other broker
+        ssn1 = brokers.second().connect().session()
+        r1 = ssn1.receiver("flq", capacity=6)
+        for x in range(4):
+            r1.fetch(timeout=0)
+            ssn1.acknowledge()
+        sender.wait()                   # Verify no longer blocked.
+
+        ssn0.connection.close()
+        ssn1.connection.close()
+        cluster_test_logs.verify_logs()
+
+    def test_queue_flowlimit(self):
+        """Test flow limits on a standalone broker"""
+        broker = self.broker()
+        class Brokers:
+            def first(self): return broker
+            def second(self): return broker
+        self.queue_flowlimit_test(Brokers())
+
+    def test_queue_flowlimit_cluster(self):
+        return          # TODO aconway 2011-02-18: disabled till fixed, QPID-2935
+        cluster = self.cluster(2)
+        class Brokers:
+            def first(self): return cluster[0]
+            def second(self): return cluster[1]
+        self.queue_flowlimit_test(Brokers())
+
+    def test_queue_flowlimit_cluster_join(self):
+        return          # TODO aconway 2011-02-18: disabled till fixed, QPID-2935
+        cluster = self.cluster(1)
+        class Brokers:
+            def first(self): return cluster[0]
+            def second(self):
+                if len(cluster) == 1: cluster.start()
+                return cluster[1]
+        self.queue_flowlimit_test(Brokers())
+
 class LongTests(BrokerTest):
     """Tests that can run for a long time if -DDURATION=<minutes> is set"""
     def duration(self):

Propchange: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 19 15:03:16 2011
@@ -0,0 +1 @@
+/qpid/branches/qpid-2935/qpid/cpp/src/tests/cluster_tests.py:1061302-1072333

Propchange: qpid/trunk/qpid/dotnet/build-msbuild.bat
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 19 15:03:16 2011
@@ -0,0 +1 @@
+/qpid/branches/qpid-2935/qpid/dotnet/build-msbuild.bat:1061302-1072333

Propchange: qpid/trunk/qpid/dotnet/build-nant-release
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 19 15:03:16 2011
@@ -0,0 +1 @@
+/qpid/branches/qpid-2935/qpid/dotnet/build-nant-release:1061302-1072333

Propchange: qpid/trunk/qpid/dotnet/build-nant.bat
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 19 15:03:16 2011
@@ -0,0 +1 @@
+/qpid/branches/qpid-2935/qpid/dotnet/build-nant.bat:1061302-1072333

Propchange: qpid/trunk/qpid/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 19 15:03:16 2011
@@ -2,4 +2,5 @@
 /qpid/branches/0.5.x-dev/qpid/java:886720-886722,887145,892761,894875,916304,916325,930288,931179
 /qpid/branches/java-broker-0-10/qpid/java:795950-829653
 /qpid/branches/java-network-refactor/qpid/java:805429-821809
+/qpid/branches/qpid-2935/qpid/java:1061302-1072333
 /qpid/trunk/qpid:796646-796653

Propchange: qpid/trunk/qpid/java/broker/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 19 15:03:16 2011
@@ -2,4 +2,5 @@
 /qpid/branches/java-broker-0-10/qpid/java/broker:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/broker:805429-821809
 /qpid/branches/jmx_mc_gsoc09/qpid/java/broker:787599
+/qpid/branches/qpid-2935/qpid/java/broker:1061302-1072333
 /qpid/trunk/qpid/java/broker:742626,743015,743028-743029,743304,743306,743311,743357,744113,747363,747367,747369-747370,747376,747783,747868-747870,747875,748561,748591,748641,748680,748686,749149,749282,749285,749315,749340,749572,753219-753220,753253,754934,754958,755256,757258,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790

Propchange: qpid/trunk/qpid/java/broker/bin/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 19 15:03:16 2011
@@ -1,4 +1,5 @@
 /qpid/branches/0.5-release/qpid/java/broker/bin:757268
 /qpid/branches/java-broker-0-10/qpid/java/broker/bin:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/broker/bin:805429-821809
+/qpid/branches/qpid-2935/qpid/java/broker/bin:1061302-1072333
 /qpid/trunk/qpid/java/broker/bin:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790

Propchange: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 19 15:03:16 2011
@@ -3,4 +3,5 @@
 /qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/management:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/management:805429-821809
 /qpid/branches/jmx_mc_gsoc09/qpid/java/broker/src/main/java/org/apache/qpid/server/management:787599
+/qpid/branches/qpid-2935/qpid/java/broker/src/main/java/org/apache/qpid/server/management:1061302-1072333
 /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790

Propchange: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 19 15:03:16 2011
@@ -3,4 +3,5 @@
 /qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:805429-821809
 /qpid/branches/jmx_mc_gsoc09/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:787599
+/qpid/branches/qpid-2935/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:1061302-1072333
 /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:753219-753220,753253,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790

Propchange: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 19 15:03:16 2011
@@ -1 +1,2 @@
 /qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost:930288
+/qpid/branches/qpid-2935/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost:1061302-1072333



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org