You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2011/09/15 19:02:50 UTC
svn commit: r1171174 - in /qpid/trunk/qpid/cpp/src/qpid/broker:
SemanticState.cpp SemanticState.h
Author: gsim
Date: Thu Sep 15 17:02:50 2011
New Revision: 1171174
URL: http://svn.apache.org/viewvc?rev=1171174&view=rev
Log:
QPID-3488: Ensure that message-stop clears any outstanding credit 'window'
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1171174&r1=1171173&r2=1171174&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Thu Sep 15 17:02:50 2011
@@ -281,6 +281,7 @@ SemanticState::ConsumerImpl::ConsumerImp
acquire(_acquire),
blocked(true),
windowing(true),
+ windowActive(false),
exclusive(_exclusive),
resumeId(_resumeId),
resumeTtl(_resumeTtl),
@@ -531,7 +532,7 @@ void SemanticState::ConsumerImpl::comple
{
if (!delivery.isComplete()) {
delivery.complete();
- if (windowing) {
+ if (windowing && windowActive) {
if (msgCredit != 0xFFFFFFFF) msgCredit++;
if (byteCredit != 0xFFFFFFFF) byteCredit += delivery.getCredit();
}
@@ -627,6 +628,7 @@ void SemanticState::ConsumerImpl::setCre
void SemanticState::ConsumerImpl::addByteCredit(uint32_t value)
{
assertClusterSafe();
+ if (windowing) windowActive = true;
if (byteCredit != 0xFFFFFFFF) {
if (value == 0xFFFFFFFF) byteCredit = value;
else byteCredit += value;
@@ -636,6 +638,7 @@ void SemanticState::ConsumerImpl::addByt
void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value)
{
assertClusterSafe();
+ if (windowing) windowActive = true;
if (msgCredit != 0xFFFFFFFF) {
if (value == 0xFFFFFFFF) msgCredit = value;
else msgCredit += value;
@@ -656,7 +659,8 @@ void SemanticState::ConsumerImpl::flush(
{
while(haveCredit() && queue->dispatch(shared_from_this()))
;
- stop();
+ msgCredit = 0;
+ byteCredit = 0;
}
void SemanticState::ConsumerImpl::stop()
@@ -664,6 +668,7 @@ void SemanticState::ConsumerImpl::stop()
assertClusterSafe();
msgCredit = 0;
byteCredit = 0;
+ windowActive = false;
}
Queue::shared_ptr SemanticState::getQueue(const string& name) const {
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=1171174&r1=1171173&r2=1171174&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Thu Sep 15 17:02:50 2011
@@ -81,6 +81,7 @@ class SemanticState : private boost::non
const bool acquire;
bool blocked;
bool windowing;
+ bool windowActive;
bool exclusive;
std::string resumeId;
uint64_t resumeTtl;
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org