You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2009/08/12 20:29:03 UTC
svn commit: r803655 - in /qpid/trunk/qpid/cpp/src/qpid:
broker/SemanticState.cpp broker/SemanticState.h broker/SessionAdapter.cpp
broker/SessionState.cpp framing/IsInSequenceSet.h
Author: aconway
Date: Wed Aug 12 18:29:03 2009
New Revision: 803655
URL: http://svn.apache.org/viewvc?rev=803655&view=rev
Log:
Optimized handling of accepts and completions.
SemanticState::accept/completed now make a single pass through the
SequenceSet of commands and the unacked DeliveryRecord list in
parallel, rather than doing a pass through unacked for every range in
the SequenceSet.
Added:
qpid/trunk/qpid/cpp/src/qpid/framing/IsInSequenceSet.h
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=803655&r1=803654&r2=803655&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Wed Aug 12 18:29:03 2009
@@ -31,6 +31,8 @@
#include "qpid/broker/TxPublish.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/SequenceSet.h"
+#include "qpid/framing/IsInSequenceSet.h"
#include "qpid/log/Statement.h"
#include "qpid/ptr_map.h"
#include "qpid/broker/AclModule.h"
@@ -49,8 +51,9 @@
namespace qpid {
namespace broker {
-using std::mem_fun_ref;
+using namespace std;
using boost::intrusive_ptr;
+using boost::bind;
using namespace qpid::broker;
using namespace qpid::framing;
using namespace qpid::sys;
@@ -631,13 +634,27 @@
}
-void SemanticState::accepted(DeliveryId first, DeliveryId last)
-{
- AckRange range = findRange(first, last);
+// Test that a DeliveryRecord's ID is in a sequence set and some other
+// predicate on DeliveryRecord holds.
+template <class Predicate> struct IsInSequenceSetAnd {
+ IsInSequenceSet isInSet;
+ Predicate predicate;
+ IsInSequenceSetAnd(const SequenceSet& s, Predicate p) : isInSet(s), predicate(p) {}
+ bool operator()(DeliveryRecord& dr) {
+ return isInSet(dr.getId()) && predicate(dr);
+ }
+};
+
+template<class Predicate> IsInSequenceSetAnd<Predicate>
+isInSequenceSetAnd(const SequenceSet& s, Predicate p) {
+ return IsInSequenceSetAnd<Predicate>(s,p);
+}
+
+void SemanticState::accepted(const SequenceSet& commands) {
if (txBuffer.get()) {
//in transactional mode, don't dequeue or remove, just
//maintain set of acknowledged messages:
- accumulatedAck.add(first, last);
+ accumulatedAck.add(commands);
if (dtxBuffer.get()) {
//if enlisted in a dtx, copy the relevant slice from
@@ -649,21 +666,28 @@
//mark the relevant messages as 'ended' in unacked
//if the messages are already completed, they can be
//removed from the record
- DeliveryRecords::iterator removed = remove_if(range.start, range.end, mem_fun_ref(&DeliveryRecord::setEnded));
- unacked.erase(removed, range.end);
+ DeliveryRecords::iterator removed =
+ remove_if(unacked.begin(), unacked.end(),
+ isInSequenceSetAnd(commands,
+ bind(&DeliveryRecord::setEnded, _1)));
+ unacked.erase(removed, unacked.end());
}
} else {
- DeliveryRecords::iterator removed = remove_if(range.start, range.end, boost::bind(&DeliveryRecord::accept, _1, (TransactionContext*) 0));
- unacked.erase(removed, range.end);
+ DeliveryRecords::iterator removed =
+ remove_if(unacked.begin(), unacked.end(),
+ isInSequenceSetAnd(commands,
+ bind(&DeliveryRecord::accept, _1,
+ (TransactionContext*) 0)));
+ unacked.erase(removed, unacked.end());
}
}
-void SemanticState::completed(DeliveryId first, DeliveryId last)
-{
- AckRange range = findRange(first, last);
-
- DeliveryRecords::iterator removed = remove_if(range.start, range.end, boost::bind(&SemanticState::complete, this, _1));
- unacked.erase(removed, range.end);
+void SemanticState::completed(const SequenceSet& commands) {
+ DeliveryRecords::iterator removed =
+ remove_if(unacked.begin(), unacked.end(),
+ isInSequenceSetAnd(commands,
+ bind(&SemanticState::complete, this, _1)));
+ unacked.erase(removed, unacked.end());
requestDispatch();
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=803655&r1=803654&r2=803655&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Wed Aug 12 18:29:03 2009
@@ -206,9 +206,8 @@
void reject(DeliveryId first, DeliveryId last);
void handle(boost::intrusive_ptr<Message> msg);
- //final 0-10 spec (completed and accepted are distinct):
- void completed(DeliveryId deliveryTag, DeliveryId endTag);
- void accepted(DeliveryId deliveryTag, DeliveryId endTag);
+ void completed(const framing::SequenceSet& commands);
+ void accepted(const framing::SequenceSet& commands);
void attached();
void detached();
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=803655&r1=803654&r2=803655&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Wed Aug 12 18:29:03 2009
@@ -429,13 +429,11 @@
}
}
-
SessionAdapter::MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) :
HandlerHelper(s),
releaseRedeliveredOp(boost::bind(&SemanticState::release, &state, _1, _2, true)),
releaseOp(boost::bind(&SemanticState::release, &state, _1, _2, false)),
- rejectOp(boost::bind(&SemanticState::reject, &state, _1, _2)),
- acceptOp(boost::bind(&SemanticState::accepted, &state, _1, _2))
+ rejectOp(boost::bind(&SemanticState::reject, &state, _1, _2))
{}
//
@@ -547,8 +545,7 @@
void SessionAdapter::MessageHandlerImpl::accept(const framing::SequenceSet& commands)
{
-
- commands.for_each(acceptOp);
+ state.accepted(commands);
}
framing::MessageAcquireResult SessionAdapter::MessageHandlerImpl::acquire(const framing::SequenceSet& transfers)
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=803655&r1=803654&r2=803655&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Wed Aug 12 18:29:03 2009
@@ -357,8 +357,7 @@
void SessionState::senderCompleted(const SequenceSet& commands) {
qpid::SessionState::senderCompleted(commands);
- for (SequenceSet::RangeIterator i = commands.rangesBegin(); i != commands.rangesEnd(); i++)
- semanticState.completed(i->first(), i->last());
+ semanticState.completed(commands);
}
void SessionState::readyToSend() {
Added: qpid/trunk/qpid/cpp/src/qpid/framing/IsInSequenceSet.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/framing/IsInSequenceSet.h?rev=803655&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/framing/IsInSequenceSet.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/framing/IsInSequenceSet.h Wed Aug 12 18:29:03 2009
@@ -0,0 +1,51 @@
+#ifndef QPID_FRAMING_ISINSEQUENCESET_H
+#define QPID_FRAMING_ISINSEQUENCESET_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/framing/SequenceSet.h"
+
+namespace qpid {
+namespace framing {
+/**
+ * Functor to test whether values are in a sequence set. This is a
+ * stateful functor that requires the values to be supplied in order
+ * and takes advantage of that ordering to avoid multiple scans.
+ */
+class IsInSequenceSet
+{
+ public:
+ IsInSequenceSet(const SequenceSet& s) : set(s), i(set.rangesBegin()) {}
+
+ bool operator()(const SequenceNumber& n) {
+ while (i != set.rangesEnd() && i->end() <= n) ++i;
+ return i != set.rangesEnd() && i->begin() <= n;
+ }
+
+ private:
+ const SequenceSet& set;
+ SequenceSet::RangeIterator i;
+};
+
+}} // namespace qpid::framing
+
+#endif /*!QPID_FRAMING_ISINSEQUENCESET_H*/
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org