You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2011/02/16 19:58:06 UTC

svn commit: r1071360 - in /qpid/branches/qpid-2935/qpid/cpp/src: qpid/broker/ tests/

Author: kgiusti
Date: Wed Feb 16 18:58:06 2011
New Revision: 1071360

URL: http://svn.apache.org/viewvc?rev=1071360&view=rev
Log:
QPID-2935: move completion from msg to session cmd context.

Modified:
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/AsyncCompletion.h
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.cpp
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/PersistableMessage.h
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.h
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.cpp
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.h
    qpid/branches/qpid-2935/qpid/cpp/src/tests/MessageUtils.h
    qpid/branches/qpid-2935/qpid/cpp/src/tests/QueueTest.cpp
    qpid/branches/qpid-2935/qpid/cpp/src/tests/TxPublishTest.cpp
    qpid/branches/qpid-2935/qpid/cpp/src/tests/brokertest.py
    qpid/branches/qpid-2935/qpid/cpp/src/tests/cluster_tests.py

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/AsyncCompletion.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/AsyncCompletion.h?rev=1071360&r1=1071359&r2=1071360&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/AsyncCompletion.h (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/AsyncCompletion.h Wed Feb 16 18:58:06 2011
@@ -1,5 +1,5 @@
-#ifndef _Completion_
-#define _Completion_
+#ifndef _AsyncCompletion_
+#define _AsyncCompletion_
 
 /*
  *
@@ -26,148 +26,145 @@
 #include "qpid/sys/AtomicValue.h"
 #include "qpid/sys/Mutex.h"
 #include "qpid/sys/Monitor.h"
-#include <boost/function.hpp>
-#include <boost/shared_ptr.hpp>
 
 namespace qpid {
-    namespace broker {
+namespace broker {
 
-        /**
-         * Class to implement asynchronous notification of completion.
-         *
-         * Use-case: An "initiator" needs to wait for a set of "completers" to
-         * finish a unit of work before an action can occur.  This object
-         * tracks the progress of the set of completers, and allows the action
-         * to occur once all completers have signalled that they are done.
-         *
-         * The initiator and completers may be running in separate threads.
-         *
-         * The initiating thread is the thread that initiates the action,
-         * i.e. the connection read thread.
-         *
-         * A completing thread is any thread that contributes to completion,
-         * e.g. a store thread that does an async write.
-         * There may be zero or more completers.
-         *
-         * When the work is complete, a callback is invoked.  The callback
-         * may be invoked in the Initiator thread, or one of the Completer
-         * threads. The callback is passed a flag indicating whether or not
-         * the callback is running under the context of the Initiator thread.
-         *
-         * Use model:
-         * 1) Initiator thread invokes begin()
-         * 2) After begin() has been invoked, zero or more Completers invoke
-         * startCompleter().  Completers may be running in the same or
-         * different thread as the Initiator, as long as they guarantee that
-         * startCompleter() is invoked at least once before the Initiator invokes end().
-         * 3) Completers may invoke finishCompleter() at any time, even after the
-         * initiator has invoked end().  finishCompleter() may be called from any
-         * thread.
-         * 4) startCompleter()/finishCompleter() calls "nest": for each call to
-         * startCompleter(), a corresponding call to finishCompleter() must be made.
-         * Once the last finishCompleter() is called, the Completer must no longer
-         * reference the completion object.
-         * 5) The Initiator invokes end() at the point where it has finished
-         * dispatching work to the Completers, and is prepared for the callback
-         * handler to be invoked. Note: if there are no outstanding Completers
-         * pending when the Initiator invokes end(), the callback will be invoked
-         * directly, and the sync parameter will be set true. This indicates to the
-         * Initiator that the callback is executing in the context of the end() call,
-         * and the Initiator is free to optimize the handling of the completion,
-         * assuming no need for synchronization with Completer threads.
-         */
-        class AsyncCompletion {
-      public:
-            // encapsulates the completion callback handler
-            class CompletionHandler {
-          public:
-                virtual void operator() (bool) { /* bool == true if called via end() */}
-            };
-
-      private:
-            mutable qpid::sys::AtomicValue<uint32_t> completionsNeeded;
-            mutable qpid::sys::Monitor callbackLock;
-            bool inCallback;
-            void invokeCallback(bool sync) {
-                qpid::sys::Mutex::ScopedLock l(callbackLock);
-                inCallback = true;
-                if (handler) {
-                    boost::shared_ptr<CompletionHandler> tmp;
-                    tmp.swap(handler);
-                    {
-                        qpid::sys::Mutex::ScopedUnlock ul(callbackLock);
-                        (*tmp)(sync);
-                    }
-                }
-                inCallback = false;
-                callbackLock.notifyAll();
-            }
-
-      protected:
-            /** Invoked when all completers have signalled that they have completed
-             * (via calls to finishCompleter()).
-             */
-            boost::shared_ptr<CompletionHandler> handler;
-
-      public:
-      AsyncCompletion() : completionsNeeded(0), inCallback(false) {};
-            virtual ~AsyncCompletion() { /* @todo KAG - assert(completionsNeeded.get() == 0); */ };
-
-            /** True when all outstanding operations have compeleted
-             */
-            bool isDone()
-            {
-                qpid::sys::Mutex::ScopedLock l(callbackLock);
-                return !inCallback && completionsNeeded.get() == 0;
-            }
-
-            /** Called to signal the start of an asynchronous operation.  The operation
-             * is considered pending until finishCompleter() is called.
-             * E.g. called when initiating an async store operation.
-             */
-            void startCompleter() { ++completionsNeeded; }
-
-            /** Called by completer to signal that it has finished the operation started
-             * when startCompleter() was invoked.
-             * e.g. called when async write complete.
-             */
-            void finishCompleter()
-            {
-                if (--completionsNeeded == 0) {
-                    invokeCallback(false);
-                }
-            }
+/**
+ * Class to implement asynchronous notification of completion.
+ *
+ * Use-case: An "initiator" needs to wait for a set of "completers" to
+ * finish a unit of work before an action can occur.  This object
+ * tracks the progress of the set of completers, and allows the action
+ * to occur once all completers have signalled that they are done.
+ *
+ * The initiator and completers may be running in separate threads.
+ *
+ * The initiating thread is the thread that initiates the action,
+ * i.e. the connection read thread.
+ *
+ * A completing thread is any thread that contributes to completion,
+ * e.g. a store thread that does an async write.
+ * There may be zero or more completers.
+ *
+ * When the work is complete, a callback is invoked.  The callback
+ * may be invoked in the Initiator thread, or one of the Completer
+ * threads. The callback is passed a flag indicating whether or not
+ * the callback is running under the context of the Initiator thread.
+ *
+ * Use model:
+ * 1) Initiator thread invokes begin()
+ * 2) After begin() has been invoked, zero or more Completers invoke
+ * startCompleter().  Completers may be running in the same or
+ * different thread as the Initiator, as long as they guarantee that
+ * startCompleter() is invoked at least once before the Initiator invokes end().
+ * 3) Completers may invoke finishCompleter() at any time, even after the
+ * initiator has invoked end().  finishCompleter() may be called from any
+ * thread.
+ * 4) startCompleter()/finishCompleter() calls "nest": for each call to
+ * startCompleter(), a corresponding call to finishCompleter() must be made.
+ * Once the last finishCompleter() is called, the Completer must no longer
+ * reference the completion object.
+ * 5) The Initiator invokes end() at the point where it has finished
+ * dispatching work to the Completers, and is prepared for the callback
+ * handler to be invoked. Note: if there are no outstanding Completers
+ * pending when the Initiator invokes end(), the callback will be invoked
+ * directly, and the sync parameter will be set true. This indicates to the
+ * Initiator that the callback is executing in the context of the end() call,
+ * and the Initiator is free to optimize the handling of the completion,
+ * assuming no need for synchronization with Completer threads.
+ */
 
-            /** called by initiator before any calls to startCompleter can be done.
-             */
-            void begin() { startCompleter(); };
-
-            /** called by initiator after all potential completers have called
-             * startCompleter().
-             */
-            void end(boost::shared_ptr<CompletionHandler> _handler)
+class AsyncCompletion
+{
+ private:
+    mutable qpid::sys::AtomicValue<uint32_t> completionsNeeded;
+    mutable qpid::sys::Monitor callbackLock;
+    bool inCallback, active;
+
+    void invokeCallback(bool sync) {
+        qpid::sys::Mutex::ScopedLock l(callbackLock);
+        if (active) {
+            inCallback = true;
             {
-                assert(completionsNeeded.get() > 0);    // ensure begin() has been called!
-                handler = _handler;
-                if (--completionsNeeded == 0) {
-                    invokeCallback(true);
-                }
-            }
-
-            /** may be called by Initiator to cancel the callback registered by end()
-             */
-            void cancel() {
-                qpid::sys::Mutex::ScopedLock l(callbackLock);
-                while (inCallback) callbackLock.wait();
-                handler.reset();
+                qpid::sys::Mutex::ScopedUnlock ul(callbackLock);
+                completed(sync);
             }
+            inCallback = false;
+            active = false;
+            callbackLock.notifyAll();
+        }
+    }
+
+ protected:
+    /** Invoked when all completers have signalled that they have completed
+     * (via calls to finishCompleter()). bool == true if called via end()
+     */
+    virtual void completed(bool) = 0;
+
+ public:
+    AsyncCompletion() : completionsNeeded(0), inCallback(false), active(true) {};
+    virtual ~AsyncCompletion() { cancel(); }
+
+    /** True when all outstanding operations have compeleted
+     */
+    bool isDone()
+    {
+        qpid::sys::Mutex::ScopedLock l(callbackLock);
+        return !active;
+    }
+
+    /** Called to signal the start of an asynchronous operation.  The operation
+     * is considered pending until finishCompleter() is called.
+     * E.g. called when initiating an async store operation.
+     */
+    void startCompleter() { ++completionsNeeded; }
+
+    /** Called by completer to signal that it has finished the operation started
+     * when startCompleter() was invoked.
+     * e.g. called when async write complete.
+     */
+    void finishCompleter()
+    {
+        if (--completionsNeeded == 0) {
+            invokeCallback(false);
+        }
+    }
+
+    /** called by initiator before any calls to startCompleter can be done.
+     */
+    void begin()
+    {
+        qpid::sys::Mutex::ScopedLock l(callbackLock);
+        ++completionsNeeded;
+    }
+
+    /** called by initiator after all potential completers have called
+     * startCompleter().
+     */
+    void end()
+    {
+        assert(completionsNeeded.get() > 0);    // ensure begin() has been called!
+        if (--completionsNeeded == 0) {
+            invokeCallback(true);
+        }
+    }
+
+    /** may be called by Initiator to cancel the callback.  Will wait for
+     * callback to complete if in progress.
+     */
+    virtual void cancel() {
+        qpid::sys::Mutex::ScopedLock l(callbackLock);
+        while (inCallback) callbackLock.wait();
+        active = false;
+    }
+
+    /** may be called by Initiator after all completers have been added but
+     * prior to calling end().  Allows initiator to determine if it _really_
+     * needs to wait for pending Completers (e.g. count > 1).
+     */
+    //uint32_t getPendingCompleters() { return completionsNeeded.get(); }
+};
 
-            /** may be called by Initiator after all completers have been added but
-             * prior to calling end().  Allows initiator to determine if it _really_
-             * needs to wait for pending Completers (e.g. count > 1).
-             */
-            uint32_t getPendingCompleters() { return completionsNeeded.get(); }
-        };
-
-    }}  // qpid::broker::
-#endif  /*!_Completion_*/
+}}  // qpid::broker::
+#endif  /*!_AsyncCompletion_*/

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1071360&r1=1071359&r2=1071360&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.cpp Wed Feb 16 18:58:06 2011
@@ -60,6 +60,7 @@
 #include "qpid/StringUtils.h"
 #include "qpid/Url.h"
 #include "qpid/Version.h"
+#include "qpid/sys/ClusterSafe.h"
 
 #include <boost/bind.hpp>
 #include <boost/format.hpp>
@@ -224,12 +225,19 @@ Broker::Broker(const Broker::Options& co
     }
 
     QueuePolicy::setDefaultMaxSize(conf.queueLimit);
-    QueueFlowLimit::setDefaults(conf.queueLimit, conf.queueFlowStopRatio, conf.queueFlowResumeRatio);
     queues.setQueueEvents(&queueEvents);
 
     // Early-Initialize plugins
     Plugin::earlyInitAll(*this);
 
+    /** todo KAG - remove once cluster support for flow control done + (and ClusterSafe.h include above). */
+    if (sys::isCluster()) {
+        QPID_LOG(warning, "Producer Flow Control TBD for clustered brokers - queue flow control disabled by default.");
+        QueueFlowLimit::setDefaults(0, 0, 0);
+    } else {
+        QueueFlowLimit::setDefaults(conf.queueLimit, conf.queueFlowStopRatio, conf.queueFlowResumeRatio);
+    }
+
     // If no plugin store module registered itself, set up the null store.
     if (NullMessageStore::isNullStore(store.get())) 
         setStore();

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/PersistableMessage.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/PersistableMessage.h?rev=1071360&r1=1071359&r2=1071360&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/PersistableMessage.h (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/PersistableMessage.h Wed Feb 16 18:58:06 2011
@@ -48,14 +48,15 @@ class PersistableMessage : public Persis
     sys::Mutex storeLock;
 
     /**
+     * "Ingress" messages == messages sent _to_ the broker.
      * Tracks the number of outstanding asynchronous operations that must
-     * complete before the message can be considered safely received by the
+     * complete before an inbound message can be considered fully received by the
      * broker.  E.g. all enqueues have completed, the message has been written
      * to store, credit has been replenished, etc. Once all outstanding
      * operations have completed, the transfer of this message from the client
      * may be considered complete.
      */
-    AsyncCompletion receiveCompletion;
+    boost::shared_ptr<AsyncCompletion> ingressCompletion;
 
     /**
      * Tracks the number of outstanding asynchronous dequeue
@@ -113,9 +114,13 @@ class PersistableMessage : public Persis
 
     virtual QPID_BROKER_EXTERN bool isPersistent() const = 0;
 
-    QPID_BROKER_EXTERN bool isReceiveComplete() { return receiveCompletion.isDone(); }
-    QPID_BROKER_EXTERN void enqueueStart() { receiveCompletion.startCompleter(); }
-    QPID_BROKER_EXTERN void enqueueComplete() { receiveCompletion.finishCompleter(); }
+    /** track the progress of a message received by the broker - see ingressCompletion above */
+    QPID_BROKER_EXTERN bool isIngressComplete() { return !ingressCompletion || ingressCompletion->isDone(); }
+    QPID_BROKER_EXTERN boost::shared_ptr<AsyncCompletion>& getIngressCompletion() { return ingressCompletion; }
+    QPID_BROKER_EXTERN void setIngressCompletion(boost::shared_ptr<AsyncCompletion>& ic) { ingressCompletion = ic; }
+
+    QPID_BROKER_EXTERN void enqueueStart() { if (ingressCompletion) ingressCompletion->startCompleter(); }
+    QPID_BROKER_EXTERN void enqueueComplete() { if (ingressCompletion) ingressCompletion->finishCompleter(); }
 
     QPID_BROKER_EXTERN void enqueueAsync(PersistableQueue::shared_ptr queue,
                                          MessageStore* _store);
@@ -131,8 +136,6 @@ class PersistableMessage : public Persis
     bool isStoredOnQueue(PersistableQueue::shared_ptr queue);
     
     void addToSyncList(PersistableQueue::shared_ptr queue, MessageStore* _store);
-
-    QPID_BROKER_EXTERN AsyncCompletion& getReceiveCompletion() { return receiveCompletion; }
 };
 
 }}

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1071360&r1=1071359&r2=1071360&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.cpp Wed Feb 16 18:58:06 2011
@@ -686,7 +686,7 @@ uint32_t Queue::getEnqueueCompleteMessag
         //NOTE: don't need to use checkLvqReplace() here as it
         //is only relevant for LVQ which does not support persistence
         //so the enqueueComplete check has no effect
-        if ( i->payload->isReceiveComplete() ) count ++;
+        if ( i->payload->isIngressComplete() ) count ++;
     }
     
     return count;
@@ -1219,6 +1219,12 @@ void Queue::flush()
     if (u.acquired && store) store->flush(*this);
 }
 
+const Broker* Queue::getBroker()
+{
+    return broker;
+}
+
+
 Queue::UsageBarrier::UsageBarrier(Queue& q) : parent(q), count(0) {}
 
 bool Queue::UsageBarrier::acquire()

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.h?rev=1071360&r1=1071359&r2=1071360&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.h Wed Feb 16 18:58:06 2011
@@ -362,6 +362,8 @@ class Queue : public boost::enable_share
     void recoverPrepared(boost::intrusive_ptr<Message>& msg);
 
     void flush();
+
+    const Broker* getBroker();
 };
 }
 }

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp?rev=1071360&r1=1071359&r2=1071360&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp Wed Feb 16 18:58:06 2011
@@ -19,12 +19,15 @@
  *
  */
 #include "qpid/broker/QueueFlowLimit.h"
+#include "qpid/broker/Broker.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/Exception.h"
 #include "qpid/framing/FieldValue.h"
 #include "qpid/framing/reply_exceptions.h"
 #include "qpid/log/Statement.h"
 #include "qpid/sys/Mutex.h"
+#include "qpid/broker/SessionState.h"
+#include "qpid/sys/ClusterSafe.h"
 
 #include "qmf/org/apache/qpid/broker/Queue.h"
 
@@ -92,7 +95,7 @@ QueueFlowLimit::QueueFlowLimit(Queue *_q
     : queue(_queue), queueName("<unknown>"),
       flowStopCount(_flowStopCount), flowResumeCount(_flowResumeCount),
       flowStopSize(_flowStopSize), flowResumeSize(_flowResumeSize),
-      flowStopped(false), count(0), size(0), queueMgmtObj(0)
+      flowStopped(false), count(0), size(0), queueMgmtObj(0), broker(0)
 {
     uint32_t maxCount(0);
     uint64_t maxSize(0);
@@ -103,6 +106,7 @@ QueueFlowLimit::QueueFlowLimit(Queue *_q
             maxSize = _queue->getPolicy()->getMaxSize();
             maxCount = _queue->getPolicy()->getMaxCount();
         }
+        broker = queue->getBroker();
     }
     validateFlowConfig( maxCount, flowStopCount, flowResumeCount, "count", queueName );
     validateFlowConfig( maxSize, flowStopSize, flowResumeSize, "size", queueName );
@@ -140,7 +144,13 @@ void QueueFlowLimit::enqueued(const Queu
     }
 
     if (flowStopped || !index.empty()) {
-        msg.payload->getReceiveCompletion().startCompleter();    // don't complete until flow resumes
+        // ignore flow control if we are populating the queue due to cluster replication:
+        if (broker && broker->isClusterUpdatee()) {
+            QPID_LOG(error, "KAG: Queue \"" << queueName << "\": ignoring flow control for msg pos=" << msg.position);
+            return;
+        }
+        QPID_LOG(trace, "Queue \"" << queueName << "\": setting flow control for msg pos=" << msg.position);
+        msg.payload->getIngressCompletion()->startCompleter();    // don't complete until flow resumes
         index.insert(msg.payload);
     }
 }
@@ -180,14 +190,14 @@ void QueueFlowLimit::dequeued(const Queu
             // flow enabled - release all pending msgs
             while (!index.empty()) {
                 std::set< boost::intrusive_ptr<Message> >::iterator itr = index.begin();
-                (*itr)->getReceiveCompletion().finishCompleter();
+                (*itr)->getIngressCompletion()->finishCompleter();
                 index.erase(itr);
             }
         } else {
             // even if flow controlled, we must release this msg as it is being dequeued
             std::set< boost::intrusive_ptr<Message> >::iterator itr = index.find(msg.payload);
             if (itr != index.end()) {       // this msg is flow controlled, release it:
-                (*itr)->getReceiveCompletion().finishCompleter();
+                (*itr)->getIngressCompletion()->finishCompleter();
                 index.erase(itr);
             }
         }
@@ -195,6 +205,35 @@ void QueueFlowLimit::dequeued(const Queu
 }
 
 
+/** used by clustering: is the given message's completion blocked due to flow
+ * control?  True if message is blocked. (for the clustering updater: done
+ * after msgs have been replicated to the updatee).
+ */
+bool QueueFlowLimit::getState(const QueuedMessage& msg) const
+{
+    sys::Mutex::ScopedLock l(indexLock);
+    return (index.find(msg.payload) != index.end());
+}
+
+
+/** artificially force the flow control state of a given message
+ * (for the clustering updatee: done after msgs have been replicated to
+ * the updatee's queue)
+ */
+void QueueFlowLimit::setState(const QueuedMessage& msg, bool blocked)
+{
+    if (blocked && msg.payload) {
+
+        sys::Mutex::ScopedLock l(indexLock);
+        assert(index.find(msg.payload) == index.end());
+
+        QPID_LOG(error, "KAG TBD!!!: Queue \"" << queue->getName() << "\": forcing flow control for msg pos=" << msg.position << " for CLUSTER SYNC");
+        // KAG TBD!!!
+        index.insert(msg.payload);
+    }
+}
+
+
 void QueueFlowLimit::setManagementObject(_qmfBroker::Queue *mgmtObject)
 {
     queueMgmtObj = mgmtObject;
@@ -284,6 +323,14 @@ std::auto_ptr<QueueFlowLimit> QueueFlowL
         if (flowStopCount == 0 && flowStopSize == 0) {   // disable flow control
             return std::auto_ptr<QueueFlowLimit>();
         }
+        /** todo KAG - remove once cluster support for flow control done. */
+        if (sys::isCluster()) {
+            if (queue) {
+                QPID_LOG(warning, "Producer Flow Control TBD for clustered brokers - queue flow control disabled for queue "
+                         << queue->getName());
+            }
+            return std::auto_ptr<QueueFlowLimit>();
+        }
         return std::auto_ptr<QueueFlowLimit>(new QueueFlowLimit(queue, flowStopCount, flowResumeCount,
                                                                 flowStopSize, flowResumeSize));
     }
@@ -293,6 +340,15 @@ std::auto_ptr<QueueFlowLimit> QueueFlowL
         uint64_t flowStopSize = (uint64_t)(maxByteCount * (defaultFlowStopRatio/100.0) + 0.5);
         uint64_t flowResumeSize = (uint64_t)(maxByteCount * (defaultFlowResumeRatio/100.0));
 
+        /** todo KAG - remove once cluster support for flow control done. */
+        if (sys::isCluster()) {
+            if (queue) {
+                QPID_LOG(warning, "Producer Flow Control TBD for clustered brokers - queue flow control disabled for queue "
+                         << queue->getName());
+            }
+            return std::auto_ptr<QueueFlowLimit>();
+        }
+
         return std::auto_ptr<QueueFlowLimit>(new QueueFlowLimit(queue, 0, 0, flowStopSize, flowResumeSize));
     }
     return std::auto_ptr<QueueFlowLimit>();

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.h?rev=1071360&r1=1071359&r2=1071360&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.h (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.h Wed Feb 16 18:58:06 2011
@@ -43,6 +43,8 @@ namespace _qmfBroker = qmf::org::apache:
 namespace qpid {
 namespace broker {
 
+class Broker;
+
 /**
  * Producer flow control: when level is > flowStop*, flow control is ON.
  * then level is < flowResume*, flow control is OFF.  If == 0, flow control
@@ -82,6 +84,11 @@ class QueueFlowLimit
     /** the queue has removed QueuedMessage.  Returns true if flow state changes */
     void dequeued(const QueuedMessage&);
 
+    /** for clustering: */
+    /** true if the given message is flow controlled, and cannot be completed. */
+    bool getState(const QueuedMessage&) const;
+    void setState(const QueuedMessage&, bool blocked);
+
     uint32_t getFlowStopCount() const { return flowStopCount; }
     uint32_t getFlowResumeCount() const { return flowResumeCount; }
     uint64_t getFlowStopSize() const { return flowStopSize; }
@@ -103,10 +110,12 @@ class QueueFlowLimit
  protected:
     // msgs waiting for flow to become available.
     std::set< boost::intrusive_ptr<Message> > index;
-    qpid::sys::Mutex indexLock;
+    mutable qpid::sys::Mutex indexLock;
 
     _qmfBroker::Queue *queueMgmtObj;
 
+    const Broker *broker;
+
     QueueFlowLimit(Queue *queue,
                    uint32_t flowStopCount, uint32_t flowResumeCount,
                    uint64_t flowStopSize,  uint64_t flowResumeSize);

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=1071360&r1=1071359&r2=1071360&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.cpp Wed Feb 16 18:58:06 2011
@@ -61,7 +61,7 @@ SessionState::SessionState(
       msgBuilder(&broker.getStore()),
       mgmtObject(0),
       rateFlowcontrol(0),
-      scheduledRcvMsgs(new IncompleteRcvMsg::deque)
+      scheduledCmds(new std::list<SequenceNumber>)
 {
     uint32_t maxRate = broker.getOptions().maxSessionRate;
     if (maxRate) {
@@ -95,20 +95,22 @@ SessionState::~SessionState() {
     if (flowControlTimer)
         flowControlTimer->cancel();
 
-    // clean up any outstanding incomplete receive messages
-    qpid::sys::ScopedLock<Mutex> l(incompleteRcvMsgsLock);
-    std::map<const IncompleteRcvMsg *, IncompleteRcvMsg::shared_ptr> copy(incompleteRcvMsgs);
-    incompleteRcvMsgs.clear();
+    // clean up any outstanding incomplete commands
+    qpid::sys::ScopedLock<Mutex> l(incompleteCmdsLock);
+    std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> > copy(incompleteCmds);
+    incompleteCmds.clear();
     while (!copy.empty()) {
-        boost::shared_ptr<IncompleteRcvMsg> ref(copy.begin()->second);
+        boost::shared_ptr<IncompleteCommandContext> ref(copy.begin()->second);
         copy.erase(copy.begin());
         {
             // note: need to drop lock, as callback may attempt to take it.
-            qpid::sys::ScopedUnlock<Mutex> ul(incompleteRcvMsgsLock);
+            qpid::sys::ScopedUnlock<Mutex> ul(incompleteCmdsLock);
             ref->cancel();
         }
     }
-    scheduledRcvMsgs->clear();  // no need to lock - shared with IO thread.
+    // At this point, we are guaranteed no further completion callbacks will be
+    // made.
+    scheduledCmds->clear();  // keeps IO thread from running more completions.
 }
 
 AMQP_ClientProxy& SessionState::getProxy() {
@@ -265,10 +267,12 @@ void SessionState::handleContent(AMQFram
         }
         msg->setPublisher(&getConnection());
 
-        msg->getReceiveCompletion().begin();
+        boost::shared_ptr<AsyncCompletion> ac(boost::dynamic_pointer_cast<AsyncCompletion>(createIngressMsgXferContext(msg)));
+        msg->setIngressCompletion( ac );
+        ac->begin();
         semanticState.handle(msg);
         msgBuilder.end();
-        msg->getReceiveCompletion().end( createPendingMsg(msg) );   // allows msg to complete
+        ac->end();  // allows msg to complete xfer
     }
 
     // Handle producer session flow control
@@ -323,13 +327,15 @@ void SessionState::sendAcceptAndCompleti
  * its credit has been accounted for, etc).  At this point, msg is considered
  * by this receiver as 'completed' (as defined by AMQP 0_10)
  */
-void SessionState::completeRcvMsg(boost::intrusive_ptr<qpid::broker::Message> msg)
+void SessionState::completeRcvMsg(SequenceNumber id,
+                                  bool requiresAccept,
+                                  bool requiresSync)
 {
     bool callSendCompletion = false;
-    receiverCompleted(msg->getCommandId());
-    if (msg->requiresAccept())
+    receiverCompleted(id);
+    if (requiresAccept)
         // will cause msg's seq to appear in the next message.accept we send.
-        accepted.add(msg->getCommandId());
+        accepted.add(id);
 
     // Are there any outstanding Execution.Sync commands pending the
     // completion of this msg?  If so, complete them.
@@ -343,7 +349,7 @@ void SessionState::completeRcvMsg(boost:
     }
 
     // if the sender has requested immediate notification of the completion...
-    if (msg->getFrames().getMethod()->isSync()) {
+    if (requiresSync) {
         sendAcceptAndCompletion();
     } else if (callSendCompletion) {
         sendCompletion();
@@ -435,70 +441,83 @@ void SessionState::addPendingExecutionSy
 }
 
 
+/** factory for creating IncompleteIngressMsgXfer objects which
+ * can be references from Messages as ingress AsyncCompletion objects.
+ */
+boost::shared_ptr<SessionState::IncompleteIngressMsgXfer>
+SessionState::createIngressMsgXferContext(boost::intrusive_ptr<Message> msg)
+{
+    SequenceNumber id = msg->getCommandId();
+    boost::shared_ptr<SessionState::IncompleteIngressMsgXfer> cmd(new SessionState::IncompleteIngressMsgXfer(this, id, msg));
+    qpid::sys::ScopedLock<Mutex> l(incompleteCmdsLock);
+    incompleteCmds[id] = cmd;
+    return cmd;
+}
+
+
 /** Invoked by the asynchronous completer associated with
  * a received msg that is pending Completion.  May be invoked
  * by the SessionState directly (sync == true), or some external
  * entity (!sync).
  */
-void SessionState::IncompleteRcvMsg::operator() (bool sync)
+void SessionState::IncompleteIngressMsgXfer::completed(bool sync)
 {
-    QPID_LOG(debug, ": async completion callback for msg seq=" << msg->getCommandId() << " sync=" << sync);
-
-    qpid::sys::ScopedLock<Mutex> l(session->incompleteRcvMsgsLock);
-    std::map<const IncompleteRcvMsg *, IncompleteRcvMsg::shared_ptr>::iterator i = session->incompleteRcvMsgs.find(this);
-    if (i != session->incompleteRcvMsgs.end()) {
-        boost::shared_ptr<IncompleteRcvMsg> tmp(i->second);
-        session->incompleteRcvMsgs.erase(i);
-
-        if (session->isAttached()) {
-            if (sync) {
-                qpid::sys::ScopedUnlock<Mutex> ul(session->incompleteRcvMsgsLock);
-                QPID_LOG(debug, ": receive completed for msg seq=" << msg->getCommandId());
-                session->completeRcvMsg(msg);
+    qpid::sys::ScopedLock<Mutex> l(session->incompleteCmdsLock);
+    if (!sync) {
+        // note well: this path may execute in any thread.
+        QPID_LOG(debug, ": async completion callback scheduled for msg seq=" << id);
+        session->scheduledCmds->push_back(id);
+        if (session->scheduledCmds->size() == 1) {
+            session->getConnection().requestIOProcessing(boost::bind(&scheduledCompleter,
+                                                                     session->scheduledCmds,
+                                                                     session));
+        }
+    } else {  // command is being completed in IO thread.
+        // this path runs only on the IO thread.
+        std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> >::iterator cmd;
+        cmd = session->incompleteCmds.find(id);
+        if (cmd != session->incompleteCmds.end()) {
+            boost::shared_ptr<IncompleteCommandContext> tmp(cmd->second);
+            session->incompleteCmds.erase(cmd);
+
+            if (session->isAttached()) {
+                QPID_LOG(debug, ": receive completed for msg seq=" << id);
+                qpid::sys::ScopedUnlock<Mutex> ul(session->incompleteCmdsLock);
+                session->completeRcvMsg(id, requiresAccept, requiresSync);
                 return;
-            } else {    // potentially called from a different thread
-                QPID_LOG(debug, ": scheduling completion for msg seq=" << msg->getCommandId());
-                session->scheduledRcvMsgs->push_back(tmp);
-                if (session->scheduledRcvMsgs->size() == 1) {
-                    session->getConnection().requestIOProcessing(boost::bind(&scheduledCompleter,
-                                                                             session->scheduledRcvMsgs));
-                }
             }
         }
     }
 }
 
 
-/** Scheduled from IncompleteRcvMsg callback, completes all pending message
- * receives asynchronously.
+/** Scheduled from incomplete command's completed callback, safely completes all
+ * completed commands in the IO Thread.  Guaranteed not to be running at the same
+ * time as the message receive code.
  */
-void SessionState::IncompleteRcvMsg::scheduledCompleter(boost::shared_ptr<deque> msgs)
+void SessionState::scheduledCompleter(boost::shared_ptr< std::list<SequenceNumber> > completedCmds,
+                                      SessionState *session)
 {
-    while (!msgs->empty()) {
-        boost::shared_ptr<IncompleteRcvMsg> iMsg = msgs->front();
-        msgs->pop_front();
-        QPID_LOG(debug, ": scheduled completion for msg seq=" << iMsg->msg->getCommandId());
-        if (iMsg->session && iMsg->session->isAttached()) {
-            QPID_LOG(debug, iMsg->session->getId() << ": receive completed for msg seq=" << iMsg->msg->getCommandId());
-            iMsg->session->completeRcvMsg(iMsg->msg);
+    // when session is destroyed, it clears the list below. If the list is empty,
+    // the passed session pointer is not valid - do nothing.
+    if (completedCmds->empty()) return;
+
+    qpid::sys::ScopedLock<Mutex> l(session->incompleteCmdsLock);
+    std::list<SequenceNumber> cmds(*completedCmds); // make copy so we can drop lock
+    completedCmds->clear();
+
+    while (!cmds.empty()) {
+        SequenceNumber id = cmds.front();
+        cmds.pop_front();
+        std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> >::iterator cmd;
+
+        cmd = session->incompleteCmds.find(id);
+        if (cmd != session->incompleteCmds.end()) {
+            qpid::sys::ScopedUnlock<Mutex> ul(session->incompleteCmdsLock);
+            cmd->second->do_completion();   // retakes lock
         }
     }
 }
 
 
-/** Cancels a pending incomplete receive message completion callback.  Note
- * well: will wait for the callback to finish if it is currently in progress
- * on another thread.
- */
-void SessionState::IncompleteRcvMsg::cancel()
-{
-    QPID_LOG(debug, session->getId() << ": cancelling outstanding completion for msg seq=" << msg->getCommandId());
-    // Cancel the message complete callback.  On return, we are guaranteed there
-    // will be no outstanding calls to SessionState::IncompleteRcvMsg::operator() (bool sync)
-    msg->getReceiveCompletion().cancel();
-    // there may be calls to SessionState::IncompleteRcvMsg::scheduledCompleter() pending,
-    // clear the session so scheduledCompleter() will ignore this IncompleteRcvMsg.
-    session = 0;
-}
-
 }} // namespace qpid::broker

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.h?rev=1071360&r1=1071359&r2=1071360&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionState.h Wed Feb 16 18:58:06 2011
@@ -134,7 +134,7 @@ class SessionState : public qpid::Sessio
 
     // indicate that the given ingress msg has been completely received by the
     // broker, and the msg's message.transfer command can be considered completed.
-    void completeRcvMsg(boost::intrusive_ptr<qpid::broker::Message> msg);
+    void completeRcvMsg(SequenceNumber id, bool requiresAccept, bool requiresSync);
 
     void handleIn(framing::AMQFrame& frame);
     void handleOut(framing::AMQFrame& frame);
@@ -172,41 +172,65 @@ class SessionState : public qpid::Sessio
     std::queue<SequenceNumber> pendingExecutionSyncs;
     bool currentCommandComplete;
 
-    // A list of ingress messages whose message.transfer command is pending
-    // completion.  These messages are awaiting some set of asynchronous
-    // operations to complete (eg: store, flow-control, etc). before
-    // the message.transfer can be completed.
-    class IncompleteRcvMsg : public AsyncCompletion::CompletionHandler
+    /** Abstract class that represents a command that is pending
+     * completion.
+     */
+    class IncompleteCommandContext : public AsyncCompletion
     {
-  public:
-        IncompleteRcvMsg(SessionState& _session, boost::intrusive_ptr<Message> _msg)
-          : session(&_session), msg(_msg) {}
-        virtual void operator() (bool sync);    // invoked when msg is completed.
-        void cancel();   // cancel pending incomplete callback [operator() above].
-
-        typedef boost::shared_ptr<IncompleteRcvMsg>  shared_ptr;
-        typedef std::deque<shared_ptr> deque;
-
-  private:
-        SessionState *session;
-        boost::intrusive_ptr<Message> msg;
+     public:
+        IncompleteCommandContext( SessionState *ss, SequenceNumber _id )
+          : id(_id), session(ss) {}
+        virtual ~IncompleteCommandContext() {}
+
+        /* allows manual invokation of completion, used by IO thread to
+         * complete a command that was originally finished on a different
+         * thread.
+         */
+        void do_completion() { completed(true); }
+
+     protected:
+        SequenceNumber id;
+        SessionState    *session;
+    };
 
-        static void scheduledCompleter(boost::shared_ptr<deque>);
+    /** incomplete Message.transfer commands - inbound to broker from client
+     */
+    class IncompleteIngressMsgXfer : public SessionState::IncompleteCommandContext
+    {
+     public:
+        IncompleteIngressMsgXfer( SessionState *ss,
+                                  SequenceNumber _id,
+                                  boost::intrusive_ptr<Message> msg )
+          : IncompleteCommandContext(ss, _id),
+          requiresAccept(msg->requiresAccept()),
+          requiresSync(msg->getFrames().getMethod()->isSync()) {};
+        virtual ~IncompleteIngressMsgXfer() {};
+
+     protected:
+        virtual void completed(bool);
+
+     private:
+        /** meta-info required to complete the message */
+        bool requiresAccept;
+        bool requiresSync;  // method's isSync() flag
     };
-    std::map<const IncompleteRcvMsg *, IncompleteRcvMsg::shared_ptr> incompleteRcvMsgs;  // msgs pending completion
-    qpid::sys::Mutex incompleteRcvMsgsLock;
-    boost::shared_ptr<IncompleteRcvMsg> createPendingMsg(boost::intrusive_ptr<Message>& msg) {
-        boost::shared_ptr<IncompleteRcvMsg> pending(new IncompleteRcvMsg(*this, msg));
-        qpid::sys::ScopedLock<qpid::sys::Mutex> l(incompleteRcvMsgsLock);
-        incompleteRcvMsgs[pending.get()] = pending;
-        return pending;
-    }
+    /** creates a command context suitable for use as an AsyncCompletion in a message */
+    boost::shared_ptr<SessionState::IncompleteIngressMsgXfer> createIngressMsgXferContext( boost::intrusive_ptr<Message> msg);
 
-    // holds msgs waiting for IO thread to run scheduledCompleter()
-    boost::shared_ptr<IncompleteRcvMsg::deque> scheduledRcvMsgs;
+    /* A list of commands that are pending completion.  These commands are
+     * awaiting some set of asynchronous operations to finish (eg: store,
+     * flow-control, etc). before the command can be completed to the client
+     */
+    std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> > incompleteCmds;
+    // identifies those commands in incompleteCmds that are waiting for IO thread to run in order to be completed.
+    boost::shared_ptr< std::list<SequenceNumber> > scheduledCmds;
+    qpid::sys::Mutex incompleteCmdsLock;  // locks both above containers
+
+    /** runs in IO thread, completes commands that where finished asynchronously. */
+    static void scheduledCompleter(boost::shared_ptr< std::list<SequenceNumber> > scheduledCmds,
+                                   SessionState *session);
 
     friend class SessionManager;
-    friend class IncompleteRcvMsg;
 };
 
 

Modified: qpid/branches/qpid-2935/qpid/cpp/src/tests/MessageUtils.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/tests/MessageUtils.h?rev=1071360&r1=1071359&r2=1071360&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/tests/MessageUtils.h (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/tests/MessageUtils.h Wed Feb 16 18:58:06 2011
@@ -20,6 +20,7 @@
  */
 
 #include "qpid/broker/Message.h"
+#include "qpid/broker/AsyncCompletion.h"
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/framing/MessageTransferBody.h"
 #include "qpid/framing/Uuid.h"
@@ -28,6 +29,17 @@ using namespace qpid;
 using namespace broker;
 using namespace framing;
 
+namespace {
+    class DummyCompletion : public AsyncCompletion
+    {
+  public:
+        DummyCompletion() {}
+        virtual ~DummyCompletion() {}
+  protected:
+        void completed(bool) {}
+    };
+}
+
 namespace qpid {
 namespace tests {
 
@@ -50,6 +62,8 @@ struct MessageUtils
         msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey);
         if (durable)
             msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setDeliveryMode(2);
+        boost::shared_ptr<AsyncCompletion>dc(new DummyCompletion());
+        msg->setIngressCompletion(dc);
         return msg;
     }
 

Modified: qpid/branches/qpid-2935/qpid/cpp/src/tests/QueueTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/tests/QueueTest.cpp?rev=1071360&r1=1071359&r2=1071360&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/tests/QueueTest.cpp Wed Feb 16 18:58:06 2011
@@ -88,6 +88,8 @@ intrusive_ptr<Message> create_message(st
     msg->getFrames().append(method);
     msg->getFrames().append(header);
     msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey);
+    boost::shared_ptr<AsyncCompletion>dc(new DummyCompletion());
+    msg->setIngressCompletion(dc);
     return msg;
 }
 

Modified: qpid/branches/qpid-2935/qpid/cpp/src/tests/TxPublishTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/tests/TxPublishTest.cpp?rev=1071360&r1=1071359&r2=1071360&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/tests/TxPublishTest.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/tests/TxPublishTest.cpp Wed Feb 16 18:58:06 2011
@@ -74,7 +74,7 @@ QPID_AUTO_TEST_CASE(testPrepare)
     BOOST_CHECK_EQUAL(pmsg, t.store.enqueued[0].second);
     BOOST_CHECK_EQUAL(string("queue2"), t.store.enqueued[1].first);
     BOOST_CHECK_EQUAL(pmsg, t.store.enqueued[1].second);
-    BOOST_CHECK_EQUAL( true, ( boost::static_pointer_cast<PersistableMessage>(t.msg))->isReceiveComplete());
+    BOOST_CHECK_EQUAL( true, ( boost::static_pointer_cast<PersistableMessage>(t.msg))->isIngressComplete());
 }
 
 QPID_AUTO_TEST_CASE(testCommit)
@@ -87,7 +87,7 @@ QPID_AUTO_TEST_CASE(testCommit)
     BOOST_CHECK_EQUAL((uint32_t) 1, t.queue1->getMessageCount());
     intrusive_ptr<Message> msg_dequeue = t.queue1->get().payload;
 
-    BOOST_CHECK_EQUAL( true, (boost::static_pointer_cast<PersistableMessage>(msg_dequeue))->isReceiveComplete());
+    BOOST_CHECK_EQUAL( true, (boost::static_pointer_cast<PersistableMessage>(msg_dequeue))->isIngressComplete());
     BOOST_CHECK_EQUAL(t.msg, msg_dequeue);
 
     BOOST_CHECK_EQUAL((uint32_t) 1, t.queue2->getMessageCount());

Modified: qpid/branches/qpid-2935/qpid/cpp/src/tests/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/tests/brokertest.py?rev=1071360&r1=1071359&r2=1071360&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/tests/brokertest.py (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/tests/brokertest.py Wed Feb 16 18:58:06 2011
@@ -29,6 +29,7 @@ from unittest import TestCase
 from copy import copy
 from threading import Thread, Lock, Condition
 from logging import getLogger
+import qmf.console
 
 log = getLogger("qpid.brokertest")
 
@@ -327,6 +328,10 @@ class Broker(Popen):
         log.debug("Started broker %s (%s, %s)" % (self.name, self.pname, self.log))
         self._log_ready = False
 
+    def startQmf(self, handler=None):
+        self.qmf_session = qmf.console.Session(handler)
+        self.qmf_broker = self.qmf_session.addBroker("%s:%s" % (self.host(), self.port()))
+
     def host(self): return self._host
 
     def port(self):

Modified: qpid/branches/qpid-2935/qpid/cpp/src/tests/cluster_tests.py
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/tests/cluster_tests.py?rev=1071360&r1=1071359&r2=1071360&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/tests/cluster_tests.py Wed Feb 16 18:58:06 2011
@@ -304,6 +304,180 @@ acl allow all all
         # Verify logs are consistent
         cluster_test_logs.verify_logs()
 
+    def test_queue_flowlimit(self):
+        """Verify that the queue's flowlimit configuration and state are
+        correctly replicated.
+        """
+        return;  # @todo enable once flow control works in clusters
+        # start a cluster of two brokers
+        args = ["--log-enable=info+:broker"]
+        cluster = self.cluster(2, args)
+
+        # configure a queue with a specific flow limit on broker 0
+        ssn0 = cluster[0].connect().session()
+        s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.max-count':99, 'qpid.flow_stop_count':5, 'qpid.flow_resume_count':3}}}}")
+        cluster[0].startQmf()
+        for q in cluster[0].qmf_session.getObjects(_class="queue"):
+            if q.name == "flq":
+                oid = q.getObjectId()
+                break
+        self.assertEqual(q.name, "flq")
+        self.assertEqual(q.flowStopCount, 5)
+        self.assertEqual(q.flowResumeCount, 3)
+        self.assertFalse(q.flowStopped)
+
+        # verify both brokers in cluster have same configuration
+        cluster[1].startQmf()
+        qs = cluster[1].qmf_session.getObjects(_objectId=oid)
+        self.assertEqual(len(qs), 1)
+        q = qs[0]
+        self.assertEqual(q.name, "flq")
+        self.assertEqual(q.flowStopCount, 5)
+        self.assertEqual(q.flowResumeCount, 3)
+        self.assertFalse(q.flowStopped)
+
+        # fill the queue on one broker until flow control is active
+        class BlockedSender(Thread):
+            def __init__(self): Thread.__init__(self)
+            def run(self):
+                for x in range(6):
+                    s0.send(Message(str(x)))
+
+        sender = BlockedSender()
+        sender.start()
+
+        start = time.time()
+        while time.time() < start + 5:
+            q = cluster[0].qmf_session.getObjects(_objectId=oid)[0]
+            if q.flowStopped:
+                break;
+        self.assertTrue(q.flowStopped)
+
+        # verify flow control is active on other broker.
+        q = cluster[1].qmf_session.getObjects(_objectId=oid)[0]
+        self.assertTrue(q.flowStopped)
+
+        # now drain the queue using a session to the other broker
+        ssn1 = cluster[1].connect().session()
+        r1 = ssn1.receiver("flq", capacity=6)
+        try:
+            while r1.fetch(timeout=0):
+                ssn1.acknowledge()
+        except Empty:
+            pass
+        sender.join()
+
+        # and verify both brokers see an unblocked queue
+        q = cluster[0].qmf_session.getObjects(_objectId=oid)[0]
+        self.assertFalse(q.flowStopped)
+        q = cluster[1].qmf_session.getObjects(_objectId=oid)[0]
+        self.assertFalse(q.flowStopped)
+
+        ssn0.connection.close()
+        ssn1.connection.close()
+        cluster_test_logs.verify_logs()
+
+
+    def test_queue_flowlimit_join(self):
+        """Verify that the queue's flowlimit configuration and state are
+        correctly replicated to a newly joined broker.
+        """
+        return;  # @todo enable once flow control works in clusters
+        # start a cluster of two brokers
+        #args = ["--log-enable=info+:broker"]
+        args = ["--log-enable=debug"]
+        cluster = self.cluster(2, args)
+
+        # configure a queue with a specific flow limit on broker 0
+        ssn0 = cluster[0].connect().session()
+        s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.max-count':99, 'qpid.flow_stop_count':5, 'qpid.flow_resume_count':3}}}}")
+        cluster[0].startQmf()
+        for q in cluster[0].qmf_session.getObjects(_class="queue"):
+            if q.name == "flq":
+                oid = q.getObjectId()
+                break
+        self.assertEqual(q.name, "flq")
+        self.assertEqual(q.flowStopCount, 5)
+        self.assertEqual(q.flowResumeCount, 3)
+        self.assertFalse(q.flowStopped)
+
+        # verify both brokers in cluster have same configuration
+        cluster[1].startQmf()
+        qs = cluster[1].qmf_session.getObjects(_objectId=oid)
+        self.assertEqual(len(qs), 1)
+        q = qs[0]
+        self.assertEqual(q.name, "flq")
+        self.assertEqual(q.flowStopCount, 5)
+        self.assertEqual(q.flowResumeCount, 3)
+        self.assertFalse(q.flowStopped)
+
+        # fill the queue on one broker until flow control is active
+        class BlockedSender(Thread):
+            def __init__(self): Thread.__init__(self)
+            def run(self):
+                for x in range(6):
+                    s0.send(Message(str(x)))
+
+        sender = BlockedSender()
+        sender.start()
+
+        start = time.time()
+        while time.time() < start + 5:
+            q = cluster[0].qmf_session.getObjects(_objectId=oid)[0]
+            if q.flowStopped:
+                break;
+        self.assertTrue(q.flowStopped)
+
+        # verify flow control is active on other broker.
+        q = cluster[1].qmf_session.getObjects(_objectId=oid)[0]
+        self.assertTrue(q.flowStopped)
+
+        # add a new broker to the cluster
+        print("Start")
+        cluster.start()
+        print("Start Done")
+        
+        # todo: enable verification:
+        # cluster[2].startQmf()
+        # qs = cluster[2].qmf_session.getObjects(_objectId=oid)
+        # self.assertEqual(len(qs), 1)
+        # q = qs[0]
+        # self.assertEqual(q.name, "flq")
+        # self.assertEqual(q.flowStopCount, 5)
+        # self.assertEqual(q.flowResumeCount, 3)
+        # self.assertEqual(q.msgDepth, 5)
+        # self.assertFalse(q.flowStopped)
+        # q = cluster[2].qmf_session.getObjects(_objectId=oid)[0]
+        # self.assertTrue(q.flowStopped)
+
+        # verify new member's queue config
+        # verify new member's queue flow setting
+
+
+
+
+        # now drain the queue using a session to the other broker
+        ssn1 = cluster[1].connect().session()
+        r1 = ssn1.receiver("flq", capacity=6)
+        try:
+            while r1.fetch(timeout=1):
+                ssn1.acknowledge()
+        except Empty:
+            pass
+        sender.join()
+
+        # and verify both brokers see an unblocked queue
+        q = cluster[0].qmf_session.getObjects(_objectId=oid)[0]
+        self.assertFalse(q.flowStopped)
+        q = cluster[1].qmf_session.getObjects(_objectId=oid)[0]
+        self.assertFalse(q.flowStopped)
+
+        ssn0.connection.close()
+        ssn1.connection.close()
+        cluster_test_logs.verify_logs()
+
+
+
 class LongTests(BrokerTest):
     """Tests that can run for a long time if -DDURATION=<minutes> is set"""
     def duration(self):



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org