You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/06/04 19:24:10 UTC
svn commit: r663318 - in /incubator/qpid/trunk/qpid/cpp/src/qpid:
SessionState.cpp SessionState.h amqp_0_10/SessionHandler.cpp
Author: aconway
Date: Wed Jun 4 10:24:10 2008
New Revision: 663318
URL: http://svn.apache.org/viewvc?rev=663318&view=rev
Log:
Request a timely reqply to session.completed based on configured flush interval.
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.h
incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp?rev=663318&r1=663317&r2=663318&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp Wed Jun 4 10:24:10 2008
@@ -135,10 +135,6 @@
}
bool SessionState::senderNeedKnownCompleted() const {
- // FIXME aconway 2008-06-04: this is unpleasant - replayFlushLimit == 0
- // means never send spontaneous flush, but sends a knownCompleted for
- // every completed. Need separate configuration?
- //
return sender.bytesSinceKnownCompleted >= config.replayFlushLimit;
}
@@ -187,6 +183,7 @@
if (isControl(f)) return true; // Ignore control frames.
stateful = true;
receiver.expected.advance(f);
+ receiver.bytesSinceKnownCompleted += f.size();
bool firstTime = receiver.expected > receiver.received;
if (firstTime) {
receiver.received = receiver.expected;
@@ -211,10 +208,15 @@
void SessionState::receiverKnownCompleted(const SequenceSet& commands) {
if (!commands.empty() && commands.back() > receiver.received.command)
throw InvalidArgumentException(QPID_MSG(getId() << ": Known-completed has invalid commands."));
+ receiver.bytesSinceKnownCompleted=0;
receiver.unknownCompleted -= commands;
QPID_LOG(debug, getId() << ": receiver known completed: " << commands << " unknown: " << receiver.unknownCompleted);
}
+bool SessionState::receiverNeedKnownCompleted() const {
+ return receiver.bytesSinceKnownCompleted >= config.replayFlushLimit;
+}
+
const SessionPoint& SessionState::receiverGetExpected() const { return receiver.expected; }
const SessionPoint& SessionState::receiverGetReceived() const { return receiver.received; }
const SequenceSet& SessionState::receiverGetUnknownComplete() const { return receiver.unknownCompleted; }
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.h?rev=663318&r1=663317&r2=663318&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.h Wed Jun 4 10:24:10 2008
@@ -148,6 +148,11 @@
/** Peer has indicated commands are known completed */
virtual void receiverKnownCompleted(const SequenceSet& commands);
+ /** True if the next completed control should set the timely-reply argument
+ * to request a knonw-completed response.
+ */
+ virtual bool receiverNeedKnownCompleted() const;
+
/** Get the incoming command point */
virtual const SessionPoint& receiverGetExpected() const;
@@ -184,6 +189,7 @@
SessionPoint received; // Received to here. Invariant: expected <= received.
SequenceSet unknownCompleted; // Received & completed, may not not known-complete by peer.
SequenceSet incomplete; // Incomplete received commands.
+ size_t bytesSinceKnownCompleted; // Bytes sent since we last issued a knownCompleted.
} receiver;
SessionId id;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp?rev=663318&r1=663317&r2=663318&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp Wed Jun 4 10:24:10 2008
@@ -243,7 +243,7 @@
void SessionHandler::sendCompletion() {
checkAttached();
const SequenceSet& c = getState()->receiverGetUnknownComplete();
- peer.completed(c, c.span() > 1000);
+ peer.completed(c, getState()->receiverNeedKnownCompleted());
}
void SessionHandler::sendAttach(bool force) {