You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/09/28 17:06:13 UTC
svn commit: r580380 - in /incubator/qpid/trunk/qpid/cpp/src/qpid: broker/
client/ framing/
Author: gsim
Date: Fri Sep 28 08:06:12 2007
New Revision: 580380
URL: http://svn.apache.org/viewvc?rev=580380&view=rev
Log:
Minor refactoring of execution layer
Set sync bit when session is in sync mode
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Completion.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Future.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQMethodBody.h
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AccumulatedAck.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AccumulatedAck.h
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ModelMethod.h
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp?rev=580380&r1=580379&r2=580380&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp Fri Sep 28 08:06:12 2007
@@ -85,7 +85,7 @@
throw ConnectionException(530, "Received odd number of elements in ranged mark");
} else {
for (SequenceNumberSet::const_iterator i = range.begin(); i != range.end(); i++) {
- state.ackRange((uint64_t) i->getValue(), (uint64_t) (++i)->getValue());
+ state.ackRange(*i, *(++i));
}
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Completion.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Completion.h?rev=580380&r1=580379&r2=580380&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Completion.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Completion.h Fri Sep 28 08:06:12 2007
@@ -43,8 +43,17 @@
future.sync(*session);
}
+ void wait()
+ {
+ future.wait(*session);
+ }
+
bool isComplete() {
- return future.isComplete();
+ return future.isComplete(*session);
+ }
+
+ bool isCompleteUpTo() {
+ return future.isCompleteUpTo(*session);
}
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h?rev=580380&r1=580379&r2=580380&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h Fri Sep 28 08:06:12 2007
@@ -35,6 +35,8 @@
virtual void sendFlushRequest() = 0;
virtual void completed(const framing::SequenceNumber& id, bool cumulative, bool send) = 0;
virtual Demux& getDemux() = 0;
+ virtual bool isComplete(const framing::SequenceNumber& id) = 0;
+ virtual bool isCompleteUpTo(const framing::SequenceNumber& id) = 0;
};
}}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp?rev=580380&r1=580379&r2=580380&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp Fri Sep 28 08:06:12 2007
@@ -63,7 +63,7 @@
AMQBody* body = frame.getBody();
if (!invoke(body, this)) {
if (!arriving) {
- arriving = FrameSet::shared_ptr(new FrameSet(++incoming.hwm));
+ arriving = FrameSet::shared_ptr(new FrameSet(++incomingCounter));
}
arriving->append(frame);
if (arriving->isComplete()) {
@@ -77,16 +77,12 @@
void ExecutionHandler::complete(uint32_t cumulative, const SequenceNumberSet& range)
{
- SequenceNumber mark(cumulative);
- if (outgoing.lwm < mark) {
- outgoing.lwm = mark;
- completion.completed(outgoing.lwm);
- }
if (range.size() % 2) { //must be even number
throw ConnectionException(530, "Received odd number of elements in ranged mark");
} else {
- //TODO: need to manage (record and accumulate) ranges such
- //that we can implictly move the mark when appropriate
+ SequenceNumber mark(cumulative);
+ outgoingCompletionStatus.update(mark, range);
+ completion.completed(outgoingCompletionStatus.mark);
//TODO: signal listeners of early notification?
}
@@ -115,7 +111,7 @@
void ExecutionHandler::flushTo(const framing::SequenceNumber& point)
{
- if (point > outgoing.lwm) {
+ if (point > outgoingCompletionStatus.mark) {
sendFlushRequest();
}
}
@@ -128,7 +124,7 @@
void ExecutionHandler::syncTo(const framing::SequenceNumber& point)
{
- if (point > outgoing.lwm) {
+ if (point > outgoingCompletionStatus.mark) {
sendSyncRequest();
}
}
@@ -142,11 +138,11 @@
void ExecutionHandler::completed(const SequenceNumber& id, bool cumulative, bool send)
{
- if (id > completionStatus.mark) {
+ if (id > incomingCompletionStatus.mark) {
if (cumulative) {
- completionStatus.update(completionStatus.mark, id);
+ incomingCompletionStatus.update(incomingCompletionStatus.mark, id);
} else {
- completionStatus.update(id, id);
+ incomingCompletionStatus.update(id, id);
}
}
if (send) {
@@ -158,8 +154,8 @@
void ExecutionHandler::sendCompletion()
{
SequenceNumberSet range;
- completionStatus.collectRanges(range);
- AMQFrame frame(0, ExecutionCompleteBody(version, completionStatus.mark.getValue(), range));
+ incomingCompletionStatus.collectRanges(range);
+ AMQFrame frame(0, ExecutionCompleteBody(version, incomingCompletionStatus.mark.getValue(), range));
out(frame);
}
@@ -170,7 +166,7 @@
SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker::ResultListener l, bool hasContent)
{
- SequenceNumber id = ++outgoing.hwm;
+ SequenceNumber id = ++outgoingCounter;
if(l) {
completion.listenForResult(id, l);
}
@@ -227,4 +223,14 @@
} else {
out(header);
}
+}
+
+bool ExecutionHandler::isComplete(const SequenceNumber& id)
+{
+ return outgoingCompletionStatus.covers(id);
+}
+
+bool ExecutionHandler::isCompleteUpTo(const SequenceNumber& id)
+{
+ return outgoingCompletionStatus.mark >= id;
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h?rev=580380&r1=580379&r2=580380&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h Fri Sep 28 08:06:12 2007
@@ -41,15 +41,16 @@
public ChainableFrameHandler,
public Execution
{
- framing::Window incoming;
- framing::Window outgoing;
+ framing::SequenceNumber incomingCounter;
+ framing::AccumulatedAck incomingCompletionStatus;
+ framing::SequenceNumber outgoingCounter;
+ framing::AccumulatedAck outgoingCompletionStatus;
framing::FrameSet::shared_ptr arriving;
Correlator correlation;
CompletionTracker completion;
Demux demux;
framing::ProtocolVersion version;
uint64_t maxFrameSize;
- framing::AccumulatedAck completionStatus;
void complete(uint32_t mark, const framing::SequenceNumberSet& range);
void flush();
@@ -76,6 +77,9 @@
void completed(const framing::SequenceNumber& id, bool cumulative, bool send);
void syncTo(const framing::SequenceNumber& point);
void flushTo(const framing::SequenceNumber& point);
+
+ bool isComplete(const framing::SequenceNumber& id);
+ bool isCompleteUpTo(const framing::SequenceNumber& id);
void setMaxFrameSize(uint64_t size) { maxFrameSize = size; }
Correlator& getCorrelator() { return correlation; }
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Future.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Future.h?rev=580380&r1=580379&r2=580380&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Future.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Future.h Fri Sep 28 08:06:12 2007
@@ -48,9 +48,16 @@
void sync(SessionCore& session)
{
- if (!complete) {
+ if (!isComplete(session)) {
+ session.getExecution().syncTo(command);
+ wait(session);
+ }
+ }
+
+ void wait(SessionCore& session)
+ {
+ if (!isComplete(session)) {
FutureCompletion callback;
- session.getExecution().flushTo(command);
session.getExecution().getCompletionTracker().listenForCompletion(
command,
boost::bind(&FutureCompletion::completed, &callback)
@@ -83,8 +90,12 @@
}
}
- bool isComplete() {
- return complete;
+ bool isComplete(SessionCore& session) {
+ return complete || session.getExecution().isComplete(command);
+ }
+
+ bool isCompleteUpTo(SessionCore& session) {
+ return complete || session.getExecution().isCompleteUpTo(command);
}
void setCommandId(const framing::SequenceNumber& id) { command = id; }
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp?rev=580380&r1=580379&r2=580380&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp Fri Sep 28 08:06:12 2007
@@ -101,6 +101,8 @@
{
checkClosed();
+ command.getMethod()->setSync(sync);
+
Future f;
//any result/response listeners must be set before the command is sent
if (command.getMethod()->resultExpected()) {
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQMethodBody.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQMethodBody.h?rev=580380&r1=580379&r2=580380&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQMethodBody.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQMethodBody.h Fri Sep 28 08:06:12 2007
@@ -64,6 +64,7 @@
virtual uint8_t type() const { return METHOD_BODY; }
virtual bool isSync() const { return false; /*only ModelMethods can have the sync flag set*/ }
+ virtual void setSync(bool) const { /*only ModelMethods can have the sync flag set*/ }
AMQMethodBody* getMethod() { return this; }
const AMQMethodBody* getMethod() const { return this; }
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AccumulatedAck.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AccumulatedAck.cpp?rev=580380&r1=580379&r2=580380&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AccumulatedAck.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AccumulatedAck.cpp Fri Sep 28 08:06:12 2007
@@ -100,6 +100,15 @@
}
}
+void AccumulatedAck::update(const SequenceNumber cumulative, const SequenceNumberSet& range)
+{
+ update(mark, cumulative);
+ for (SequenceNumberSet::const_iterator i = range.begin(); i != range.end(); i++) {
+ update(*i, *(++i));
+ }
+}
+
+
bool Range::contains(SequenceNumber i) const
{
return i >= start && i <= end;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AccumulatedAck.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AccumulatedAck.h?rev=580380&r1=580379&r2=580380&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AccumulatedAck.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AccumulatedAck.h Fri Sep 28 08:06:12 2007
@@ -64,6 +64,7 @@
void clear();
bool covers(SequenceNumber tag) const;
void collectRanges(SequenceNumberSet& set) const;
+ void update(const SequenceNumber cumulative, const SequenceNumberSet& range);
};
std::ostream& operator<<(std::ostream&, const Range&);
std::ostream& operator<<(std::ostream&, const AccumulatedAck&);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ModelMethod.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ModelMethod.h?rev=580380&r1=580379&r2=580380&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ModelMethod.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ModelMethod.h Fri Sep 28 08:06:12 2007
@@ -30,13 +30,14 @@
class ModelMethod : public AMQMethodBody
{
- ExecutionHeader header;
+ mutable ExecutionHeader header;
public:
virtual ~ModelMethod() {}
virtual void encode(Buffer& buffer) const { header.encode(buffer); }
virtual void decode(Buffer& buffer, uint32_t size=0) { header.decode(buffer, size); }
virtual uint32_t size() const { return header.size(); }
virtual bool isSync() const { return header.getSync(); }
+ virtual void setSync(bool on) const { header.setSync(on); }
ExecutionHeader& getHeader() { return header; }
const ExecutionHeader& getHeader() const { return header; }
};