You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2011/09/15 19:02:50 UTC

svn commit: r1171174 - in /qpid/trunk/qpid/cpp/src/qpid/broker: SemanticState.cpp SemanticState.h

Author: gsim
Date: Thu Sep 15 17:02:50 2011
New Revision: 1171174

URL: http://svn.apache.org/viewvc?rev=1171174&view=rev
Log:
QPID-3488: Ensure that message-stop clears any outstanding credit 'window'

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1171174&r1=1171173&r2=1171174&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Thu Sep 15 17:02:50 2011
@@ -281,6 +281,7 @@ SemanticState::ConsumerImpl::ConsumerImp
     acquire(_acquire),
     blocked(true),
     windowing(true),
+    windowActive(false),
     exclusive(_exclusive),
     resumeId(_resumeId),
     resumeTtl(_resumeTtl),
@@ -531,7 +532,7 @@ void SemanticState::ConsumerImpl::comple
 {
     if (!delivery.isComplete()) {
         delivery.complete();
-        if (windowing) {
+        if (windowing && windowActive) {
             if (msgCredit != 0xFFFFFFFF) msgCredit++;
             if (byteCredit != 0xFFFFFFFF) byteCredit += delivery.getCredit();
         }
@@ -627,6 +628,7 @@ void SemanticState::ConsumerImpl::setCre
 void SemanticState::ConsumerImpl::addByteCredit(uint32_t value)
 {
     assertClusterSafe();
+    if (windowing) windowActive = true;
     if (byteCredit != 0xFFFFFFFF) {
         if (value == 0xFFFFFFFF) byteCredit = value;
         else byteCredit += value;
@@ -636,6 +638,7 @@ void SemanticState::ConsumerImpl::addByt
 void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value)
 {
     assertClusterSafe();
+    if (windowing) windowActive = true;
     if (msgCredit != 0xFFFFFFFF) {
         if (value == 0xFFFFFFFF) msgCredit = value;
         else msgCredit += value;
@@ -656,7 +659,8 @@ void SemanticState::ConsumerImpl::flush(
 {
     while(haveCredit() && queue->dispatch(shared_from_this()))
         ;
-    stop();
+    msgCredit = 0;
+    byteCredit = 0;
 }
 
 void SemanticState::ConsumerImpl::stop()
@@ -664,6 +668,7 @@ void SemanticState::ConsumerImpl::stop()
     assertClusterSafe();
     msgCredit = 0;
     byteCredit = 0;
+    windowActive = false;
 }
 
 Queue::shared_ptr SemanticState::getQueue(const string& name) const {

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=1171174&r1=1171173&r2=1171174&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Thu Sep 15 17:02:50 2011
@@ -81,6 +81,7 @@ class SemanticState : private boost::non
         const bool acquire;
         bool blocked;
         bool windowing;
+        bool windowActive;
         bool exclusive;
         std::string resumeId;
         uint64_t resumeTtl;



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org