You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2009/08/12 20:29:03 UTC

svn commit: r803655 - in /qpid/trunk/qpid/cpp/src/qpid: broker/SemanticState.cpp broker/SemanticState.h broker/SessionAdapter.cpp broker/SessionState.cpp framing/IsInSequenceSet.h

Author: aconway
Date: Wed Aug 12 18:29:03 2009
New Revision: 803655

URL: http://svn.apache.org/viewvc?rev=803655&view=rev
Log:
Optimized handling of accepts and completions.

SemanticState::accept/completed now make a single pass through the
SequenceSet of commands and the unacked DeliveryRecord list in
parallel, rather than doing a pass through unacked for every range in
the SequenceSet.

Added:
    qpid/trunk/qpid/cpp/src/qpid/framing/IsInSequenceSet.h
Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=803655&r1=803654&r2=803655&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Wed Aug 12 18:29:03 2009
@@ -31,6 +31,8 @@
 #include "qpid/broker/TxPublish.h"
 #include "qpid/framing/reply_exceptions.h"
 #include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/SequenceSet.h"
+#include "qpid/framing/IsInSequenceSet.h"
 #include "qpid/log/Statement.h"
 #include "qpid/ptr_map.h"
 #include "qpid/broker/AclModule.h"
@@ -49,8 +51,9 @@
 namespace qpid {
 namespace broker {
 
-using std::mem_fun_ref;
+using namespace std;
 using boost::intrusive_ptr;
+using boost::bind;
 using namespace qpid::broker;
 using namespace qpid::framing;
 using namespace qpid::sys;
@@ -631,13 +634,27 @@
 }
 
 
-void SemanticState::accepted(DeliveryId first, DeliveryId last)
-{
-    AckRange range = findRange(first, last);
+// Test that a DeliveryRecord's ID is in a sequence set and some other
+// predicate on DeliveryRecord holds.
+template <class Predicate> struct IsInSequenceSetAnd {
+    IsInSequenceSet isInSet;
+    Predicate predicate;
+    IsInSequenceSetAnd(const SequenceSet& s, Predicate p) : isInSet(s), predicate(p) {}
+    bool operator()(DeliveryRecord& dr) {
+        return isInSet(dr.getId()) && predicate(dr);
+    }
+};
+
+template<class Predicate> IsInSequenceSetAnd<Predicate>
+isInSequenceSetAnd(const SequenceSet& s, Predicate p) {
+    return IsInSequenceSetAnd<Predicate>(s,p);
+}
+
+void SemanticState::accepted(const SequenceSet& commands) {
     if (txBuffer.get()) {
         //in transactional mode, don't dequeue or remove, just
         //maintain set of acknowledged messages:
-        accumulatedAck.add(first, last);
+        accumulatedAck.add(commands);
         
         if (dtxBuffer.get()) {
             //if enlisted in a dtx, copy the relevant slice from
@@ -649,21 +666,28 @@
             //mark the relevant messages as 'ended' in unacked
             //if the messages are already completed, they can be
             //removed from the record
-            DeliveryRecords::iterator removed = remove_if(range.start, range.end, mem_fun_ref(&DeliveryRecord::setEnded));
-            unacked.erase(removed, range.end);
+            DeliveryRecords::iterator removed =
+                remove_if(unacked.begin(), unacked.end(),
+                          isInSequenceSetAnd(commands,
+                                             bind(&DeliveryRecord::setEnded, _1)));
+            unacked.erase(removed, unacked.end());
         }
     } else {
-        DeliveryRecords::iterator removed = remove_if(range.start, range.end, boost::bind(&DeliveryRecord::accept, _1, (TransactionContext*) 0));
-        unacked.erase(removed, range.end);
+        DeliveryRecords::iterator removed =
+            remove_if(unacked.begin(), unacked.end(),
+                      isInSequenceSetAnd(commands,
+                                         bind(&DeliveryRecord::accept, _1,
+                                              (TransactionContext*) 0)));
+        unacked.erase(removed, unacked.end());
     }
 }
 
-void SemanticState::completed(DeliveryId first, DeliveryId last)
-{
-    AckRange range = findRange(first, last);
-    
-    DeliveryRecords::iterator removed = remove_if(range.start, range.end, boost::bind(&SemanticState::complete, this, _1));
-    unacked.erase(removed, range.end);
+void SemanticState::completed(const SequenceSet& commands) {
+    DeliveryRecords::iterator removed =
+        remove_if(unacked.begin(), unacked.end(),
+                  isInSequenceSetAnd(commands,
+                                     bind(&SemanticState::complete, this, _1)));
+    unacked.erase(removed, unacked.end());
     requestDispatch();
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=803655&r1=803654&r2=803655&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Wed Aug 12 18:29:03 2009
@@ -206,9 +206,8 @@
     void reject(DeliveryId first, DeliveryId last);
     void handle(boost::intrusive_ptr<Message> msg);
 
-    //final 0-10 spec (completed and accepted are distinct):
-    void completed(DeliveryId deliveryTag, DeliveryId endTag);
-    void accepted(DeliveryId deliveryTag, DeliveryId endTag);
+    void completed(const framing::SequenceSet& commands);
+    void accepted(const framing::SequenceSet& commands);
 
     void attached();
     void detached();

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=803655&r1=803654&r2=803655&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Wed Aug 12 18:29:03 2009
@@ -429,13 +429,11 @@
     }
 } 
 
-
 SessionAdapter::MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) : 
     HandlerHelper(s),
     releaseRedeliveredOp(boost::bind(&SemanticState::release, &state, _1, _2, true)),
     releaseOp(boost::bind(&SemanticState::release, &state, _1, _2, false)),
-    rejectOp(boost::bind(&SemanticState::reject, &state, _1, _2)),
-    acceptOp(boost::bind(&SemanticState::accepted, &state, _1, _2))
+    rejectOp(boost::bind(&SemanticState::reject, &state, _1, _2))
  {}
 
 //
@@ -547,8 +545,7 @@
 
 void SessionAdapter::MessageHandlerImpl::accept(const framing::SequenceSet& commands)
 {
-
-    commands.for_each(acceptOp);
+    state.accepted(commands);
 }
 
 framing::MessageAcquireResult SessionAdapter::MessageHandlerImpl::acquire(const framing::SequenceSet& transfers)

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=803655&r1=803654&r2=803655&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Wed Aug 12 18:29:03 2009
@@ -357,8 +357,7 @@
 
 void SessionState::senderCompleted(const SequenceSet& commands) {
     qpid::SessionState::senderCompleted(commands);
-    for (SequenceSet::RangeIterator i = commands.rangesBegin(); i != commands.rangesEnd(); i++)
-        semanticState.completed(i->first(), i->last());
+    semanticState.completed(commands);
 }
 
 void SessionState::readyToSend() {

Added: qpid/trunk/qpid/cpp/src/qpid/framing/IsInSequenceSet.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/framing/IsInSequenceSet.h?rev=803655&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/framing/IsInSequenceSet.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/framing/IsInSequenceSet.h Wed Aug 12 18:29:03 2009
@@ -0,0 +1,51 @@
+#ifndef QPID_FRAMING_ISINSEQUENCESET_H
+#define QPID_FRAMING_ISINSEQUENCESET_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/framing/SequenceSet.h"
+
+namespace qpid {
+namespace framing {
+/**
+ * Functor to test whether values are in a sequence set.  This is a
+ * stateful functor that requires the values to be supplied in order
+ * and takes advantage of that ordering to avoid multiple scans.
+ */
+class IsInSequenceSet
+{
+  public:
+    IsInSequenceSet(const SequenceSet& s) : set(s), i(set.rangesBegin()) {}
+
+    bool operator()(const SequenceNumber& n) {
+        while (i != set.rangesEnd() && i->end() <= n) ++i;
+        return i != set.rangesEnd() && i->begin() <= n;
+    }
+
+  private:
+    const SequenceSet& set;
+    SequenceSet::RangeIterator i;
+};
+
+}} // namespace qpid::framing
+
+#endif  /*!QPID_FRAMING_ISINSEQUENCESET_H*/



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org