You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2009/01/27 22:17:48 UTC

svn commit: r738247 - in /qpid/trunk/qpid/cpp/src: ./ qpid/broker/ qpid/client/ qpid/sys/ tests/

Author: astitcher
Date: Tue Jan 27 21:17:47 2009
New Revision: 738247

URL: http://svn.apache.org/viewvc?rev=738247&view=rev
Log:
Producer side rate throttling:
    This uses the Message.Flow command to send credit from
    broker to client to ensure that the client doesn't
    exceed a rate configured on the broker per session.

Added:
    qpid/trunk/qpid/cpp/src/qpid/broker/RateFlowcontrol.h
    qpid/trunk/qpid/cpp/src/tests/RateFlowcontrolTest.cpp
Modified:
    qpid/trunk/qpid/cpp/src/Makefile.am
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
    qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h
    qpid/trunk/qpid/cpp/src/qpid/sys/Semaphore.h
    qpid/trunk/qpid/cpp/src/tests/Makefile.am

Modified: qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=738247&r1=738246&r2=738247&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/Makefile.am Tue Jan 27 21:17:47 2009
@@ -539,6 +539,7 @@
   qpid/broker/QueuedMessage.h \
   qpid/broker/QueuePolicy.h \
   qpid/broker/QueueRegistry.h \
+  qpid/broker/RateFlowcontrol.h \
   qpid/broker/RateTracker.h \
   qpid/broker/RecoverableConfig.h \
   qpid/broker/RecoverableExchange.h \

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=738247&r1=738246&r2=738247&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Tue Jan 27 21:17:47 2009
@@ -90,7 +90,8 @@
     replayHardLimit(0),
     queueLimit(100*1048576/*100M default limit*/),
     tcpNoDelay(false),
-    requireEncrypted(false)
+    requireEncrypted(false),
+    maxSessionRate(0)
 {
     int c = sys::SystemInfo::concurrency();
     workerThreads=c+1;
@@ -119,7 +120,8 @@
         ("default-queue-limit", optValue(queueLimit, "BYTES"), "Default maximum size for queues (in bytes)") 
         ("tcp-nodelay", optValue(tcpNoDelay), "Set TCP_NODELAY on TCP connections")
         ("require-encryption", optValue(requireEncrypted), "Only accept connections that are encrypted")
-        ("known-hosts-url", optValue(knownHosts, "URL or 'none'"), "URL to send as 'known-hosts' to clients ('none' implies empty list)");
+        ("known-hosts-url", optValue(knownHosts, "URL or 'none'"), "URL to send as 'known-hosts' to clients ('none' implies empty list)")
+        ("max-session-rate", optValue(maxSessionRate, "MESSAGES/S"), "Sets the maximum message rate per session (0=unlimited)");
 }
 
 const std::string empty;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=738247&r1=738246&r2=738247&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Tue Jan 27 21:17:47 2009
@@ -105,6 +105,7 @@
         bool tcpNoDelay;
         bool requireEncrypted;
         std::string knownHosts;
+        uint32_t maxSessionRate;
     };
  
   private:

Added: qpid/trunk/qpid/cpp/src/qpid/broker/RateFlowcontrol.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/RateFlowcontrol.h?rev=738247&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/RateFlowcontrol.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/RateFlowcontrol.h Tue Jan 27 21:17:47 2009
@@ -0,0 +1,96 @@
+#ifndef broker_RateFlowcontrol_h
+#define broker_RateFlowcontrol_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Time.h"
+#include "qpid/sys/IntegerTypes.h"
+
+#include <algorithm>
+
+namespace qpid {
+namespace broker {
+
+// Class to keep track of issuing flow control to make sure that the peer doesn't exceed
+// a given message rate
+//
+// Create the object with the target rate
+// Then call sendCredit() whenever credit is issued to the peer
+// Call receivedMessage() whenever a message is received, it returns the credit to issue.
+// 
+// sentCredit() be sensibly called with a 0 parameter to indicate
+// that we sent credit but treat it as if the value was 0 (we may do this at the start of the connection
+// to allow our peer to send messages)
+//
+// receivedMessage() can be called with 0 to indicate that we've not received a message, but
+// tell me what credit I can send.
+class RateFlowcontrol {
+    uint32_t rate; // messages per second
+    uint32_t maxCredit; // max credit issued to client (issued at start)
+    uint32_t requestedCredit;
+    qpid::sys::AbsTime creditSent;
+   
+public:
+    RateFlowcontrol(uint32_t r) :
+         rate(r),
+         maxCredit(0),
+         requestedCredit(0),
+         creditSent(qpid::sys::FAR_FUTURE)
+    {}
+
+    uint32_t getRate() const {
+        return rate;
+    }
+    void sentCredit(const qpid::sys::AbsTime& t, uint32_t credit);
+    uint32_t receivedMessage(const qpid::sys::AbsTime& t, uint32_t  msgs);
+    bool flowStopped() const;
+};
+
+inline void RateFlowcontrol::sentCredit(const qpid::sys::AbsTime& t, uint32_t credit) {
+    // If the client isn't currently requesting credit (ie it's not sent us anything yet) then
+    // this credit goes to the max credit held by the client (it can't go to reduce credit
+    // less than 0)
+    int32_t nextRequestedCredit = requestedCredit - credit;
+    if ( nextRequestedCredit<0 ) {
+        requestedCredit = 0;
+        maxCredit -= nextRequestedCredit;
+    } else {
+        requestedCredit = nextRequestedCredit;
+    }
+    creditSent = t;
+}
+
+inline uint32_t RateFlowcontrol::receivedMessage(const qpid::sys::AbsTime& t, uint32_t  msgs) {
+    requestedCredit +=msgs;    
+    qpid::sys::Duration d(creditSent, t);
+    // Could be -ve before first sentCredit
+    int64_t toSend = std::min(rate * d / qpid::sys::TIME_SEC, static_cast<int64_t>(requestedCredit));
+    return toSend > 0 ? toSend : 0;
+}
+
+inline bool RateFlowcontrol::flowStopped() const {
+    return requestedCredit >= maxCredit;
+}
+
+}}
+
+#endif // broker_RateFlowcontrol_h

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=738247&r1=738246&r2=738247&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Tue Jan 27 21:17:47 2009
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,6 +24,8 @@
 #include "DeliveryRecord.h"
 #include "SessionManager.h"
 #include "SessionHandler.h"
+#include "RateFlowcontrol.h"
+#include "Timer.h"
 #include "qpid/framing/AMQContentBody.h"
 #include "qpid/framing/AMQHeaderBody.h"
 #include "qpid/framing/AMQMethodBody.h"
@@ -31,6 +33,7 @@
 #include "qpid/framing/ServerInvoker.h"
 #include "qpid/log/Statement.h"
 #include "qpid/management/ManagementBroker.h"
+#include "qpid/framing/AMQP_ClientProxy.h"
 
 #include <boost/bind.hpp>
 #include <boost/lexical_cast.hpp>
@@ -46,17 +49,19 @@
 using qpid::management::ManagementObject;
 using qpid::management::Manageable;
 using qpid::management::Args;
+using qpid::sys::AbsTime;
 namespace _qmf = qmf::org::apache::qpid::broker;
 
 SessionState::SessionState(
-    Broker& b, SessionHandler& h, const SessionId& id, const SessionState::Configuration& config) 
+    Broker& b, SessionHandler& h, const SessionId& id, const SessionState::Configuration& config)
     : qpid::SessionState(id, config),
       broker(b), handler(&h),
       semanticState(*this, *this),
       adapter(semanticState),
       msgBuilder(&broker.getStore(), broker.getStagingThreshold()),
       enqueuedOp(boost::bind(&SessionState::enqueued, this, _1)),
-      mgmtObject(0)
+      mgmtObject(0),
+      rateFlowcontrol(0)
 {
     Manageable* parent = broker.GetVhostObject ();
     if (parent != 0) {
@@ -71,12 +76,19 @@
             agent->addObject (mgmtObject, mb ? mb->allocateId(this) : 0);
         }
     }
+    uint32_t maxRate = broker.getOptions().maxSessionRate;
+    if (maxRate) {
+        rateFlowcontrol = new RateFlowcontrol(maxRate);
+    }
     attach(h);
 }
 
 SessionState::~SessionState() {
     if (mgmtObject != 0)
         mgmtObject->resourceDestroy ();
+        
+    if (flowControlTimer)
+        flowControlTimer->cancel();
 }
 
 AMQP_ClientProxy& SessionState::getProxy() {
@@ -102,7 +114,7 @@
         mgmtObject->set_attached  (0);
 }
 
-void SessionState::disableOutput() 
+void SessionState::disableOutput()
 {
     semanticState.detached();//prevents further activateOutput calls until reattached
     getConnection().outputTasks.removeOutputTask(&semanticState);
@@ -120,12 +132,12 @@
 }
 
 void SessionState::activateOutput() {
-    if (isAttached()) 
+    if (isAttached())
         getConnection().outputTasks.activateOutput();
 }
 
 void SessionState::giveReadCredit(int32_t credit) {
-    if (isAttached()) 
+    if (isAttached())
         getConnection().outputTasks.giveReadCredit(credit);
 }
 
@@ -170,18 +182,49 @@
 
 void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceNumber& id) {
     Invoker::Result invocation = invoke(adapter, *method);
-    receiverCompleted(id);                                    
+    receiverCompleted(id);
     if (!invocation.wasHandled()) {
         throw NotImplementedException(QPID_MSG("Not implemented: " << *method));
     } else if (invocation.hasResult()) {
         getProxy().getExecution().result(id, invocation.getResult());
     }
-    if (method->isSync()) { 
+    if (method->isSync()) {
         incomplete.process(enqueuedOp, true);
         sendAcceptAndCompletion();
     }
 }
 
+struct ScheduledCreditTask : public TimerTask {
+    Timer& timer;
+    SessionState& sessionState;
+    RateFlowcontrol& flowControl;
+    ScheduledCreditTask(const qpid::sys::Duration& d, Timer& t,
+                        SessionState& s, RateFlowcontrol& f) :
+        TimerTask(d),
+        timer(t),
+        sessionState(s),
+        flowControl(f)
+    {}
+
+    void fire() {
+        // This is the best we can currently do to avoid a destruction/fire race
+        if (!isCancelled()) {
+            // Send credit
+            AbsTime now = AbsTime::now();
+            uint32_t sendCredit = flowControl.receivedMessage(now, 0);
+            if ( sendCredit>0 ) {
+                QPID_LOG(debug, sessionState.getId() << ": send producer credit " << sendCredit);
+                sessionState.getProxy().getMessage().flow("", 0, sendCredit);
+                flowControl.sentCredit(now, sendCredit);
+            } else if ( flowControl.flowStopped() ) {
+                QPID_LOG(warning, sessionState.getId() << ": Reschedule sending credit");
+                reset();
+                timer.add(this);
+            }
+        }
+    }
+};
+
 void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id)
 {
     if (frame.getBof() && frame.getBos()) //start of frameset
@@ -194,10 +237,10 @@
             AMQFrame header((AMQHeaderBody()));
             header.setBof(false);
             header.setEof(false);
-            msg->getFrames().append(header);                        
+            msg->getFrames().append(header);
         }
         msg->setPublisher(&getConnection());
-        semanticState.handle(msg);        
+        semanticState.handle(msg);
         msgBuilder.end();
 
         if (msg->isEnqueueComplete()) {
@@ -206,14 +249,39 @@
             incomplete.add(msg);
         }
 
-        //hold up execution until async enqueue is complete        
-        if (msg->getFrames().getMethod()->isSync()) { 
+        //hold up execution until async enqueue is complete
+        if (msg->getFrames().getMethod()->isSync()) {
             incomplete.process(enqueuedOp, true);
             sendAcceptAndCompletion();
         } else {
             incomplete.process(enqueuedOp, false);
         }
     }
+
+    // Handle producer session flow control
+    if (rateFlowcontrol && frame.getBof() && frame.getBos()) {
+        // Check for violating flow control
+        if ( rateFlowcontrol->flowStopped() ) {
+            QPID_LOG(warning, getId() << ": producer throttling violation");
+            // TODO: Probably do message.stop("") first time then disconnect
+            getProxy().getMessage().stop("");
+        } else {
+            AbsTime now = AbsTime::now();
+            uint32_t sendCredit = rateFlowcontrol->receivedMessage(now, 1);
+            if ( sendCredit>0 ) {
+                QPID_LOG(debug, getId() << ": send producer credit " << sendCredit);
+                getProxy().getMessage().flow("", 0, sendCredit);
+                rateFlowcontrol->sentCredit(now, sendCredit);
+            } else if ( rateFlowcontrol->flowStopped() ) {
+                QPID_LOG(debug, getId() << ": Schedule sending credit");
+                Timer& timer = getBroker().getTimer();
+                // Use heuristic for scheduled credit of time for 50 messages, but not longer than 500ms
+                sys::Duration d = std::min(sys::TIME_SEC * 50 / rateFlowcontrol->getRate(), 500 * sys::TIME_MSEC);
+                flowControlTimer = new ScheduledCreditTask(d, timer, *this, *rateFlowcontrol);
+                timer.add(flowControlTimer);
+            }
+        }
+    }
 }
 
 void SessionState::sendAcceptAndCompletion()
@@ -222,7 +290,7 @@
         getProxy().getMessage().accept(accepted);
         accepted.clear();
     }
-    sendCompletion(); 
+    sendCompletion();
 }
 
 void SessionState::enqueued(boost::intrusive_ptr<Message> msg)
@@ -240,7 +308,7 @@
     if (m == 0 || m->isContentBearing()) {
         handleContent(frame, commandId);
     } else if (frame.getBof() && frame.getEof()) {
-        handleCommand(frame.getMethod(), commandId);                
+        handleCommand(frame.getMethod(), commandId);
     } else {
         throw InternalErrorException("Cannot handle multi-frame command segments yet");
     }
@@ -265,8 +333,8 @@
     }
 }
 
-void SessionState::sendCompletion() { 
-    handler->sendCompletion(); 
+void SessionState::sendCompletion() {
+    handler->sendCompletion();
 }
 
 void SessionState::senderCompleted(const SequenceSet& commands) {
@@ -282,6 +350,14 @@
     sys::AggregateOutput& tasks = handler->getConnection().outputTasks;
     tasks.addOutputTask(&semanticState);
     tasks.activateOutput();
+
+    if (rateFlowcontrol) {
+        // Issue initial credit - use a heuristic here issue min of 100 messages or 1 secs worth
+        QPID_LOG(debug, getId() << ": Issuing producer message credit " << std::min(rateFlowcontrol->getRate(), 100U));
+        getProxy().getMessage().setFlowMode("", 0);
+        getProxy().getMessage().flow("", 0, std::min(rateFlowcontrol->getRate(), 100U));
+        rateFlowcontrol->sentCredit(AbsTime::now(), std::min(rateFlowcontrol->getRate(), 100U));
+    }
 }
 
 Broker& SessionState::getBroker() { return broker; }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=738247&r1=738246&r2=738247&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Tue Jan 27 21:17:47 2009
@@ -55,6 +55,8 @@
 class Message;
 class SessionHandler;
 class SessionManager;
+class RateFlowcontrol;
+class TimerTask;
 
 /**
  * Broker-side session state includes session's handler chains, which
@@ -132,7 +134,11 @@
     qmf::org::apache::qpid::broker::Session* mgmtObject;
     qpid::framing::SequenceSet accepted;
     
-  friend class SessionManager;
+    // State used for producer flow control (rate limited)
+    RateFlowcontrol* rateFlowcontrol;
+    boost::intrusive_ptr<TimerTask> flowControlTimer;
+
+    friend class SessionManager;
 };
 
 

Modified: qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp?rev=738247&r1=738246&r2=738247&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp Tue Jan 27 21:17:47 2009
@@ -62,7 +62,8 @@
       ioHandler(*this),
       proxy(ioHandler),
       nextIn(0),
-      nextOut(0)
+      nextOut(0),
+      sendMsgCredit(0)
 {
     channel.next = connectionShared.get();
 }
@@ -76,6 +77,7 @@
             handleClosed();
             state.waitWaiters();
         }
+        delete sendMsgCredit;
     }
     boost::shared_ptr<ConnectionImpl> c =  connectionWeak.lock();
     if (c) c->erase(channel);
@@ -359,7 +361,7 @@
     uint64_t data_length = content.getData().length();
     if(data_length > 0){
         header.setLastSegment(false);
-        handleOut(header);   
+        handleContentOut(header);   
         /*Note: end of frame marker included in overhead but not in size*/
         const uint32_t frag_size = maxFrameSize - AMQFrame::frameOverhead(); 
 
@@ -388,7 +390,7 @@
             }
         }
     } else {
-        handleOut(header);   
+        handleContentOut(header);   
     }
 }
 
@@ -414,16 +416,18 @@
 void SessionImpl::handleIn(AMQFrame& frame) // network thread
 {
     try {
-        if (!invoke(static_cast<SessionHandler&>(*this), *frame.getBody())) {
-            if (invoke(static_cast<ExecutionHandler&>(*this), *frame.getBody())) {
-                //make sure the command id sequence and completion
-                //tracking takes account of execution commands
-                Lock l(state);
-                completedIn.add(nextIn++);            
-            } else {
-                //if not handled by this class, its for the application:
-                deliver(frame);
-            }
+        if (invoke(static_cast<SessionHandler&>(*this), *frame.getBody())) {
+            ;
+        } else if (invoke(static_cast<ExecutionHandler&>(*this), *frame.getBody())) {
+            //make sure the command id sequence and completion
+            //tracking takes account of execution commands
+            Lock l(state);
+            completedIn.add(nextIn++);            
+        } else if (invoke(static_cast<MessageHandler&>(*this), *frame.getBody())) {
+            ;
+        } else {
+            //if not handled by this class, its for the application:
+            deliver(frame);
         }
     }
     catch (const SessionException& e) {
@@ -439,6 +443,14 @@
     sendFrame(frame, true);
 }
 
+void SessionImpl::handleContentOut(AMQFrame& frame) // user thread
+{
+    if (sendMsgCredit) {
+        sendMsgCredit->acquire();
+    }
+    sendFrame(frame, true);
+}
+
 void SessionImpl::proxyOut(AMQFrame& frame) // network thread
 {
     //Note: this case is treated slightly differently that command
@@ -631,6 +643,69 @@
         setTimeout(0);
 }
 
+// Message methods:
+void SessionImpl::accept(const qpid::framing::SequenceSet&)
+{
+}
+
+void SessionImpl::reject(const qpid::framing::SequenceSet&, uint16_t, const std::string&)
+{
+}
+
+void SessionImpl::release(const qpid::framing::SequenceSet&, bool)
+{
+}
+
+MessageResumeResult SessionImpl::resume(const std::string&, const std::string&)
+{
+    throw NotImplementedException("resuming transfers not yet supported");
+}
+
+namespace {
+    const std::string QPID_SESSION_DEST = "";
+    const uint8_t FLOW_MODE_CREDIT = 0;
+    const uint8_t CREDIT_MODE_MSG = 0;
+}
+
+void SessionImpl::setFlowMode(const std::string& dest, uint8_t flowMode)
+{
+    if ( dest != QPID_SESSION_DEST ) {
+        QPID_LOG(warning, "Ignoring flow control for unknown destination: " << dest);
+        return;
+    }
+    
+    if ( flowMode != FLOW_MODE_CREDIT ) {
+        throw NotImplementedException("window flow control mode not supported by producer");
+    }
+    Lock l(state);
+    sendMsgCredit = new sys::Semaphore(0);
+}
+
+void SessionImpl::flow(const std::string& dest, uint8_t mode, uint32_t credit)
+{
+    if ( dest != QPID_SESSION_DEST ) {
+        QPID_LOG(warning, "Ignoring flow control for unknown destination: " << dest);
+        return;
+    }
+
+    if ( mode != CREDIT_MODE_MSG ) {
+        return;
+    }
+    if (sendMsgCredit) {
+        sendMsgCredit->release(credit);
+    }
+}
+
+void SessionImpl::stop(const std::string& dest)
+{
+    if ( dest != QPID_SESSION_DEST ) {
+        QPID_LOG(warning, "Ignoring flow control for unknown destination: " << dest);
+        return;
+    }
+    if (sendMsgCredit) {
+        sendMsgCredit->forceLock();
+    }
+}
 
 //private utility methods:
 

Modified: qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h?rev=738247&r1=738246&r2=738247&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h Tue Jan 27 21:17:47 2009
@@ -61,7 +61,8 @@
 class SessionImpl : public framing::FrameHandler::InOutHandler,
                     public Execution,
                     private framing::AMQP_ClientOperations::SessionHandler,
-                    private framing::AMQP_ClientOperations::ExecutionHandler
+                    private framing::AMQP_ClientOperations::ExecutionHandler,
+                    private framing::AMQP_ClientOperations::MessageHandler
 {
 public:
     SessionImpl(const std::string& name, shared_ptr<ConnectionImpl>);
@@ -123,6 +124,7 @@
     };
     typedef framing::AMQP_ClientOperations::SessionHandler SessionHandler;
     typedef framing::AMQP_ClientOperations::ExecutionHandler ExecutionHandler;
+    typedef framing::AMQP_ClientOperations::MessageHandler MessageHandler;
     typedef sys::StateMonitor<State, DETACHED> StateMonitor;
     typedef StateMonitor::Set States;
 
@@ -138,6 +140,7 @@
 
     void handleIn(framing::AMQFrame& frame);
     void handleOut(framing::AMQFrame& frame);
+    void handleContentOut(framing::AMQFrame& frame);
     /**
      * Sends session controls. This case is treated slightly
      * differently than command frames sent by the application via
@@ -181,6 +184,18 @@
                    uint8_t fieldIndex,
                    const std::string& description,
                    const framing::FieldTable& errorInfo);
+                   
+    // Note: Following methods are called by network thread in
+    // response to message commands from the broker
+    // EXCEPT Message.Transfer
+    void accept(const qpid::framing::SequenceSet&);
+    void reject(const qpid::framing::SequenceSet&, uint16_t, const std::string&);
+    void release(const qpid::framing::SequenceSet&, bool);
+    qpid::framing::MessageResumeResult resume(const std::string&, const std::string&);
+    void setFlowMode(const std::string&, uint8_t);
+    void flow(const std::string&, uint8_t, uint32_t);
+    void stop(const std::string&);
+
 
     sys::ExceptionHolder exceptionHolder;
     mutable StateMonitor state;
@@ -211,6 +226,9 @@
 
     SessionState sessionState;
 
+    // Only keep track of message credit 
+    sys::Semaphore* sendMsgCredit;
+
   friend class client::SessionHandler;
 };
 

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/Semaphore.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/Semaphore.h?rev=738247&r1=738246&r2=738247&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/Semaphore.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/Semaphore.h Tue Jan 27 21:17:47 2009
@@ -51,10 +51,22 @@
         count--;
     }
 
+    void release(uint n)
+    {
+        Monitor::ScopedLock l(monitor);
+        if (count==0) monitor.notifyAll();
+        count+=n;
+    }
+
     void release()
     {
+        release(1);
+    }
+
+    void forceLock()
+    {
         Monitor::ScopedLock l(monitor);
-        if (!count++) monitor.notifyAll();
+        count = 0;
     }
 
 private:

Modified: qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=738247&r1=738246&r2=738247&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/tests/Makefile.am Tue Jan 27 21:17:47 2009
@@ -91,7 +91,8 @@
 	ConsoleTest.cpp \
 	QueueEvents.cpp \
 	ProxyTest.cpp \
-	RetryList.cpp
+	RetryList.cpp \
+	RateFlowcontrolTest.cpp
 
 if HAVE_XML
 unit_test_SOURCES+= XmlClientSessionTest.cpp

Added: qpid/trunk/qpid/cpp/src/tests/RateFlowcontrolTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/RateFlowcontrolTest.cpp?rev=738247&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/RateFlowcontrolTest.cpp (added)
+++ qpid/trunk/qpid/cpp/src/tests/RateFlowcontrolTest.cpp Tue Jan 27 21:17:47 2009
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "unit_test.h"
+
+#include "qpid/broker/RateFlowcontrol.h"
+#include "qpid/sys/Time.h"
+
+using namespace qpid::broker;
+using namespace qpid::sys;
+
+QPID_AUTO_TEST_SUITE(RateFlowcontrolTestSuite)
+
+QPID_AUTO_TEST_CASE(RateFlowcontrolTest)
+{
+    // BOOST_CHECK(predicate);
+    // BOOST_CHECK_EQUAL(a, b);
+    
+   RateFlowcontrol fc(100);
+   AbsTime n=AbsTime::now();
+   
+   BOOST_CHECK_EQUAL( fc.receivedMessage(n, 0), 0U );
+   
+   fc.sentCredit(n, 0);
+   
+   BOOST_CHECK_EQUAL( fc.receivedMessage(n, 0), 0U );
+   fc.sentCredit(n, 100);
+
+   Duration d=250*TIME_MSEC;
+   
+   n = AbsTime(n,d);
+   BOOST_CHECK_EQUAL( fc.receivedMessage(n, 48), 25U );
+   fc.sentCredit(n, 25);
+
+   n = AbsTime(n,d);
+   BOOST_CHECK_EQUAL( fc.receivedMessage(n, 0), 23U );
+   fc.sentCredit(n, 23);
+   BOOST_CHECK_EQUAL( fc.receivedMessage(n, 100), 0U);
+   BOOST_CHECK(fc.flowStopped());
+
+   n = AbsTime(n,d);
+   n = AbsTime(n,d);
+   BOOST_CHECK_EQUAL( fc.receivedMessage(n, 0), 50U);
+}
+
+QPID_AUTO_TEST_SUITE_END()



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org