You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/09/28 17:06:13 UTC

svn commit: r580380 - in /incubator/qpid/trunk/qpid/cpp/src/qpid: broker/ client/ framing/

Author: gsim
Date: Fri Sep 28 08:06:12 2007
New Revision: 580380

URL: http://svn.apache.org/viewvc?rev=580380&view=rev
Log:
Minor refactoring of execution layer
Set sync bit when session is in sync mode


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Completion.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Future.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQMethodBody.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AccumulatedAck.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AccumulatedAck.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ModelMethod.h

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp?rev=580380&r1=580379&r2=580380&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp Fri Sep 28 08:06:12 2007
@@ -85,7 +85,7 @@
         throw ConnectionException(530, "Received odd number of elements in ranged mark");
     } else {
         for (SequenceNumberSet::const_iterator i = range.begin(); i != range.end(); i++) {
-            state.ackRange((uint64_t) i->getValue(), (uint64_t) (++i)->getValue());
+            state.ackRange(*i, *(++i));
         }
     }
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Completion.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Completion.h?rev=580380&r1=580379&r2=580380&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Completion.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Completion.h Fri Sep 28 08:06:12 2007
@@ -43,8 +43,17 @@
         future.sync(*session);
     }
 
+    void wait()
+    {
+        future.wait(*session);
+    }
+
     bool isComplete() {
-        return future.isComplete();
+        return future.isComplete(*session);
+    }
+
+    bool isCompleteUpTo() {
+        return future.isCompleteUpTo(*session);
     }
 };
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h?rev=580380&r1=580379&r2=580380&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Execution.h Fri Sep 28 08:06:12 2007
@@ -35,6 +35,8 @@
     virtual void sendFlushRequest() = 0;
     virtual void completed(const framing::SequenceNumber& id, bool cumulative, bool send) = 0;
     virtual Demux& getDemux() = 0;
+    virtual bool isComplete(const framing::SequenceNumber& id) = 0;
+    virtual bool isCompleteUpTo(const framing::SequenceNumber& id) = 0;
 };
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp?rev=580380&r1=580379&r2=580380&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp Fri Sep 28 08:06:12 2007
@@ -63,7 +63,7 @@
     AMQBody* body = frame.getBody();
     if (!invoke(body, this)) {
         if (!arriving) {
-            arriving = FrameSet::shared_ptr(new FrameSet(++incoming.hwm));
+            arriving = FrameSet::shared_ptr(new FrameSet(++incomingCounter));
         }
         arriving->append(frame);
         if (arriving->isComplete()) {
@@ -77,16 +77,12 @@
 
 void ExecutionHandler::complete(uint32_t cumulative, const SequenceNumberSet& range)
 {
-    SequenceNumber mark(cumulative);
-    if (outgoing.lwm < mark) {
-        outgoing.lwm = mark;
-        completion.completed(outgoing.lwm);
-    }
     if (range.size() % 2) { //must be even number        
         throw ConnectionException(530, "Received odd number of elements in ranged mark");
     } else {
-        //TODO: need to manage (record and accumulate) ranges such
-        //that we can implictly move the mark when appropriate
+        SequenceNumber mark(cumulative);        
+        outgoingCompletionStatus.update(mark, range);
+        completion.completed(outgoingCompletionStatus.mark);
 
         //TODO: signal listeners of early notification?         
     }
@@ -115,7 +111,7 @@
 
 void ExecutionHandler::flushTo(const framing::SequenceNumber& point)
 {
-    if (point > outgoing.lwm) {
+    if (point > outgoingCompletionStatus.mark) {
         sendFlushRequest();
     }        
 }
@@ -128,7 +124,7 @@
 
 void ExecutionHandler::syncTo(const framing::SequenceNumber& point)
 {
-    if (point > outgoing.lwm) {
+    if (point > outgoingCompletionStatus.mark) {
         sendSyncRequest();
     }        
 }
@@ -142,11 +138,11 @@
 
 void ExecutionHandler::completed(const SequenceNumber& id, bool cumulative, bool send)
 {
-    if (id > completionStatus.mark) {
+    if (id > incomingCompletionStatus.mark) {
         if (cumulative) {
-            completionStatus.update(completionStatus.mark, id);
+            incomingCompletionStatus.update(incomingCompletionStatus.mark, id);
         } else {
-            completionStatus.update(id, id);            
+            incomingCompletionStatus.update(id, id);            
         }
     }
     if (send) {
@@ -158,8 +154,8 @@
 void ExecutionHandler::sendCompletion()
 {
     SequenceNumberSet range;
-    completionStatus.collectRanges(range);
-    AMQFrame frame(0, ExecutionCompleteBody(version, completionStatus.mark.getValue(), range));
+    incomingCompletionStatus.collectRanges(range);
+    AMQFrame frame(0, ExecutionCompleteBody(version, incomingCompletionStatus.mark.getValue(), range));
     out(frame);    
 }
 
@@ -170,7 +166,7 @@
 
 SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker::ResultListener l, bool hasContent)
 {
-    SequenceNumber id = ++outgoing.hwm;
+    SequenceNumber id = ++outgoingCounter;
     if(l) {
         completion.listenForResult(id, l);
     }
@@ -227,4 +223,14 @@
     } else {
         out(header);   
     }
+}
+
+bool ExecutionHandler::isComplete(const SequenceNumber& id)
+{
+    return outgoingCompletionStatus.covers(id);
+}
+
+bool ExecutionHandler::isCompleteUpTo(const SequenceNumber& id)
+{
+    return outgoingCompletionStatus.mark >= id;
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h?rev=580380&r1=580379&r2=580380&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h Fri Sep 28 08:06:12 2007
@@ -41,15 +41,16 @@
     public ChainableFrameHandler,
     public Execution
 {
-    framing::Window incoming;
-    framing::Window outgoing;
+    framing::SequenceNumber incomingCounter;
+    framing::AccumulatedAck incomingCompletionStatus;
+    framing::SequenceNumber outgoingCounter;
+    framing::AccumulatedAck outgoingCompletionStatus;
     framing::FrameSet::shared_ptr arriving;
     Correlator correlation;
     CompletionTracker completion;
     Demux demux;
     framing::ProtocolVersion version;
     uint64_t maxFrameSize;
-    framing::AccumulatedAck completionStatus;
 
     void complete(uint32_t mark, const framing::SequenceNumberSet& range);    
     void flush();
@@ -76,6 +77,9 @@
     void completed(const framing::SequenceNumber& id, bool cumulative, bool send);
     void syncTo(const framing::SequenceNumber& point);
     void flushTo(const framing::SequenceNumber& point);
+
+    bool isComplete(const framing::SequenceNumber& id);
+    bool isCompleteUpTo(const framing::SequenceNumber& id);
 
     void setMaxFrameSize(uint64_t size) { maxFrameSize = size; }
     Correlator& getCorrelator() { return correlation; }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Future.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Future.h?rev=580380&r1=580379&r2=580380&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Future.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Future.h Fri Sep 28 08:06:12 2007
@@ -48,9 +48,16 @@
 
     void sync(SessionCore& session)
     {
-        if (!complete) {
+        if (!isComplete(session)) {
+            session.getExecution().syncTo(command);
+            wait(session);
+        }
+    }
+
+    void wait(SessionCore& session)
+    {
+        if (!isComplete(session)) {
             FutureCompletion callback;
-            session.getExecution().flushTo(command);
             session.getExecution().getCompletionTracker().listenForCompletion(
                 command,                                                     
                 boost::bind(&FutureCompletion::completed, &callback)
@@ -83,8 +90,12 @@
         }
     }
 
-    bool isComplete() {
-        return complete;
+    bool isComplete(SessionCore& session) {
+        return complete || session.getExecution().isComplete(command);
+    }
+
+    bool isCompleteUpTo(SessionCore& session) {
+        return complete || session.getExecution().isCompleteUpTo(command);
     }
 
     void setCommandId(const framing::SequenceNumber& id) { command = id; }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp?rev=580380&r1=580379&r2=580380&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp Fri Sep 28 08:06:12 2007
@@ -101,6 +101,8 @@
 { 
     checkClosed();
 
+    command.getMethod()->setSync(sync);
+
     Future f;
     //any result/response listeners must be set before the command is sent
     if (command.getMethod()->resultExpected()) {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQMethodBody.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQMethodBody.h?rev=580380&r1=580379&r2=580380&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQMethodBody.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQMethodBody.h Fri Sep 28 08:06:12 2007
@@ -64,6 +64,7 @@
     virtual uint8_t type() const { return METHOD_BODY; }
 
     virtual bool isSync() const { return false; /*only ModelMethods can have the sync flag set*/ }
+    virtual void setSync(bool) const { /*only ModelMethods can have the sync flag set*/ }
 
     AMQMethodBody* getMethod() { return this; }
     const AMQMethodBody* getMethod() const { return this; }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AccumulatedAck.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AccumulatedAck.cpp?rev=580380&r1=580379&r2=580380&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AccumulatedAck.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AccumulatedAck.cpp Fri Sep 28 08:06:12 2007
@@ -100,6 +100,15 @@
     }
 }
 
+void AccumulatedAck::update(const SequenceNumber cumulative, const SequenceNumberSet& range)
+{
+    update(mark, cumulative);
+    for (SequenceNumberSet::const_iterator i = range.begin(); i != range.end(); i++) {
+        update(*i, *(++i));
+    }
+}
+
+
 bool Range::contains(SequenceNumber i) const 
 { 
     return i >= start && i <= end; 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AccumulatedAck.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AccumulatedAck.h?rev=580380&r1=580379&r2=580380&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AccumulatedAck.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AccumulatedAck.h Fri Sep 28 08:06:12 2007
@@ -64,6 +64,7 @@
             void clear();
             bool covers(SequenceNumber tag) const;
             void collectRanges(SequenceNumberSet& set) const;
+            void update(const SequenceNumber cumulative, const SequenceNumberSet& range);
         };
         std::ostream& operator<<(std::ostream&, const Range&);
         std::ostream& operator<<(std::ostream&, const AccumulatedAck&);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ModelMethod.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ModelMethod.h?rev=580380&r1=580379&r2=580380&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ModelMethod.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ModelMethod.h Fri Sep 28 08:06:12 2007
@@ -30,13 +30,14 @@
 
 class ModelMethod : public AMQMethodBody 
 {
-    ExecutionHeader header;
+    mutable ExecutionHeader header;
 public:    
     virtual ~ModelMethod() {}
     virtual void encode(Buffer& buffer) const { header.encode(buffer); }
     virtual void decode(Buffer& buffer, uint32_t size=0) { header.decode(buffer, size); }
     virtual uint32_t size() const { return header.size(); } 
     virtual bool isSync() const { return header.getSync(); }
+    virtual void setSync(bool on) const { header.setSync(on); }
     ExecutionHeader& getHeader() { return header; } 
     const ExecutionHeader& getHeader()  const { return header; } 
 };