You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/21 03:20:13 UTC
svn commit: r1187150 [10/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/
cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/
cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/
cpp/bindings/qmf2/python/ cpp/bindings/qmf...
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/SemanticState.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/SemanticState.cpp Fri Oct 21 01:19:00 2011
@@ -70,12 +70,14 @@ SemanticState::SemanticState(DeliveryAda
deliveryAdapter(da),
tagGenerator("sgen"),
dtxSelected(false),
- authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isUserProxyAuth()),
+ authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isFederationLink()),
userID(getSession().getConnection().getUserId()),
userName(getSession().getConnection().getUserId().substr(0,getSession().getConnection().getUserId().find('@'))),
isDefaultRealm(userID.find('@') != std::string::npos && getSession().getBroker().getOptions().realm == userID.substr(userID.find('@')+1,userID.size())),
closeComplete(false)
-{}
+{
+ acl = getSession().getBroker().getAcl();
+}
SemanticState::~SemanticState() {
closed();
@@ -86,7 +88,7 @@ void SemanticState::closed() {
//prevent requeued messages being redelivered to consumers
for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
disable(i->second);
- }
+ }
if (dtxBuffer.get()) {
dtxBuffer->fail();
}
@@ -105,24 +107,16 @@ bool SemanticState::exists(const string&
return consumers.find(consumerTag) != consumers.end();
}
-namespace {
- const std::string SEPARATOR("::");
-}
-
-void SemanticState::consume(const string& tag,
+void SemanticState::consume(const string& tag,
Queue::shared_ptr queue, bool ackRequired, bool acquire,
bool exclusive, const string& resumeId, uint64_t resumeTtl, const FieldTable& arguments)
{
- // "tag" is only guaranteed to be unique to this session (see AMQP 0-10 Message.subscribe, destination).
- // Create a globally unique name so the broker can identify individual consumers
- std::string name = session.getSessionId().str() + SEPARATOR + tag;
- ConsumerImpl::shared_ptr c(new ConsumerImpl(this, name, queue, ackRequired, acquire, exclusive, tag, resumeId, resumeTtl, arguments));
+ ConsumerImpl::shared_ptr c(new ConsumerImpl(this, tag, queue, ackRequired, acquire, exclusive, resumeId, resumeTtl, arguments));
queue->consume(c, exclusive);//may throw exception
consumers[tag] = c;
}
-bool SemanticState::cancel(const string& tag)
-{
+void SemanticState::cancel(const string& tag){
ConsumerImplMap::iterator i = consumers.find(tag);
if (i != consumers.end()) {
cancel(i->second);
@@ -130,13 +124,7 @@ bool SemanticState::cancel(const string&
//should cancel all unacked messages for this consumer so that
//they are not redelivered on recovery
for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::cancel, _1, tag));
- //can also remove any records that are now redundant
- DeliveryRecords::iterator removed =
- remove_if(unacked.begin(), unacked.end(), bind(&DeliveryRecord::isRedundant, _1));
- unacked.erase(removed, unacked.end());
- return true;
- } else {
- return false;
+
}
}
@@ -179,8 +167,8 @@ void SemanticState::startDtx(const std::
if (!dtxSelected) {
throw CommandInvalidException(QPID_MSG("Session has not been selected for use with dtx"));
}
- dtxBuffer.reset(new DtxBuffer(xid));
- txBuffer = dtxBuffer;
+ dtxBuffer = DtxBuffer::shared_ptr(new DtxBuffer(xid));
+ txBuffer = boost::static_pointer_cast<TxBuffer>(dtxBuffer);
if (join) {
mgr.join(xid, dtxBuffer);
} else {
@@ -206,7 +194,7 @@ void SemanticState::endDtx(const std::st
dtxBuffer->fail();
} else {
dtxBuffer->markEnded();
- }
+ }
dtxBuffer.reset();
}
@@ -248,7 +236,7 @@ void SemanticState::resumeDtx(const std:
checkDtxTimeout();
dtxBuffer->setSuspended(false);
- txBuffer = dtxBuffer;
+ txBuffer = boost::static_pointer_cast<TxBuffer>(dtxBuffer);
}
void SemanticState::checkDtxTimeout()
@@ -266,33 +254,31 @@ void SemanticState::record(const Deliver
const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency");
-SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
- const string& _name,
- Queue::shared_ptr _queue,
+SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
+ const string& _name,
+ Queue::shared_ptr _queue,
bool ack,
bool _acquire,
bool _exclusive,
- const string& _tag,
const string& _resumeId,
uint64_t _resumeTtl,
const framing::FieldTable& _arguments
-) :
- Consumer(_name, _acquire),
- parent(_parent),
- queue(_queue),
- ackExpected(ack),
+) :
+ Consumer(_acquire),
+ parent(_parent),
+ name(_name),
+ queue(_queue),
+ ackExpected(ack),
acquire(_acquire),
- blocked(true),
+ blocked(true),
windowing(true),
- windowActive(false),
exclusive(_exclusive),
resumeId(_resumeId),
- tag(_tag),
resumeTtl(_resumeTtl),
arguments(_arguments),
- msgCredit(0),
+ msgCredit(0),
byteCredit(0),
notifyEnabled(true),
syncFrequency(_arguments.getAsInt(QPID_SYNC_FREQUENCY)),
@@ -303,10 +289,10 @@ SemanticState::ConsumerImpl::ConsumerImp
{
ManagementAgent* agent = parent->session.getBroker().getManagementAgent();
qpid::management::Manageable* ms = dynamic_cast<qpid::management::Manageable*> (&(parent->session));
-
+
if (agent != 0)
{
- mgmtObject = new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId(), getTag(),
+ mgmtObject = new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId() ,name,
!acquire, ackExpected, exclusive, ManagementAgent::toMap(arguments));
agent->addObject (mgmtObject);
mgmtObject->set_creditMode("WINDOW");
@@ -338,16 +324,16 @@ bool SemanticState::ConsumerImpl::delive
{
assertClusterSafe();
allocateCredit(msg.payload);
- DeliveryRecord record(msg, queue, getTag(), acquire, !ackExpected, windowing);
+ DeliveryRecord record(msg, queue, name, acquire, !ackExpected, windowing);
bool sync = syncFrequency && ++deliveryCount >= syncFrequency;
if (sync) deliveryCount = 0;//reset
parent->deliver(record, sync);
+ if (!ackExpected && acquire) record.setEnded();//allows message to be released now its been delivered
if (windowing || ackExpected || !acquire) {
parent->record(record);
- }
- if (acquire && !ackExpected) { // auto acquire && auto accept
- queue->dequeue(0 /*ctxt*/, msg);
- record.setEnded();
+ }
+ if (acquire && !ackExpected) {
+ queue->dequeue(0, msg);
}
if (mgmtObject) { mgmtObject->inc_delivered(); }
return true;
@@ -365,7 +351,7 @@ bool SemanticState::ConsumerImpl::accept
// checkCredit fails because the message is to big, we should
// remain on queue's listener list for possible smaller messages
// in future.
- //
+ //
blocked = !(filter(msg) && checkCredit(msg));
return !blocked;
}
@@ -377,7 +363,7 @@ struct ConsumerName {
};
ostream& operator<<(ostream& o, const ConsumerName& pc) {
- return o << pc.consumer.getTag() << " on "
+ return o << pc.consumer.getName() << " on "
<< pc.consumer.getParent().getSession().getSessionId();
}
}
@@ -386,7 +372,7 @@ void SemanticState::ConsumerImpl::alloca
{
assertClusterSafe();
uint32_t originalMsgCredit = msgCredit;
- uint32_t originalByteCredit = byteCredit;
+ uint32_t originalByteCredit = byteCredit;
if (msgCredit != 0xFFFFFFFF) {
msgCredit--;
}
@@ -396,7 +382,7 @@ void SemanticState::ConsumerImpl::alloca
QPID_LOG(debug, "Credit allocated for " << ConsumerName(*this)
<< ", was " << " bytes: " << originalByteCredit << " msgs: " << originalMsgCredit
<< " now bytes: " << byteCredit << " msgs: " << msgCredit);
-
+
}
bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>& msg)
@@ -410,7 +396,7 @@ bool SemanticState::ConsumerImpl::checkC
return enoughCredit;
}
-SemanticState::ConsumerImpl::~ConsumerImpl()
+SemanticState::ConsumerImpl::~ConsumerImpl()
{
if (mgmtObject != 0)
mgmtObject->resourceDestroy ();
@@ -428,7 +414,7 @@ void SemanticState::unsubscribe(Consumer
Queue::shared_ptr queue = c->getQueue();
if(queue) {
queue->cancel(c);
- if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) {
+ if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) {
Queue::tryAutoDelete(session.getBroker(), queue);
}
}
@@ -470,23 +456,23 @@ const std::string nullstring;
}
void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
- msg->computeExpiration(getSession().getBroker().getExpiryPolicy());
-
+ msg->setTimestamp(getSession().getBroker().getExpiryPolicy());
+
std::string exchangeName = msg->getExchangeName();
- if (!cacheExchange || cacheExchange->getName() != exchangeName || cacheExchange->isDestroyed())
+ if (!cacheExchange || cacheExchange->getName() != exchangeName)
cacheExchange = session.getBroker().getExchanges().get(exchangeName);
cacheExchange->setProperties(msg);
/* verify the userid if specified: */
std::string id =
msg->hasProperties<MessageProperties>() ? msg->getProperties<MessageProperties>()->getUserId() : nullstring;
+
if (authMsg && !id.empty() && !(id == userID || (isDefaultRealm && id == userName)))
{
QPID_LOG(debug, "authorised user id : " << userID << " but user id in message declared as " << id);
throw UnauthorizedAccessException(QPID_MSG("authorised user id : " << userID << " but user id in message declared as " << id));
}
- AclModule* acl = getSession().getBroker().getAcl();
if (acl && acl->doTransferAcl())
{
if (!acl->authorise(getSession().getConnection().getUserId(),acl::ACT_PUBLISH,acl::OBJ_EXCHANGE,exchangeName, msg->getRoutingKey() ))
@@ -498,7 +484,7 @@ void SemanticState::route(intrusive_ptr<
if (!strategy.delivered) {
//TODO:if discard-unroutable, just drop it
- //TODO:else if accept-mode is explicit, reject it
+ //TODO:else if accept-mode is explicit, reject it
//else route it to alternate exchange
if (cacheExchange->getAlternate()) {
cacheExchange->getAlternate()->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders());
@@ -527,7 +513,7 @@ void SemanticState::ConsumerImpl::reques
}
bool SemanticState::complete(DeliveryRecord& delivery)
-{
+{
ConsumerImplMap::iterator i = consumers.find(delivery.getTag());
if (i != consumers.end()) {
i->second->complete(delivery);
@@ -539,7 +525,7 @@ void SemanticState::ConsumerImpl::comple
{
if (!delivery.isComplete()) {
delivery.complete();
- if (windowing && windowActive) {
+ if (windowing) {
if (msgCredit != 0xFFFFFFFF) msgCredit++;
if (byteCredit != 0xFFFFFFFF) byteCredit += delivery.getCredit();
}
@@ -555,7 +541,7 @@ void SemanticState::recover(bool requeue
unacked.clear();
for_each(copy.rbegin(), copy.rend(), mem_fun_ref(&DeliveryRecord::requeue));
}else{
- for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::redeliver, _1, this));
+ for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::redeliver, _1, this));
//unconfirmed messages re redelivered and therefore have their
//id adjusted, confirmed messages are not and so the ordering
//w.r.t id is lost
@@ -568,61 +554,50 @@ void SemanticState::deliver(DeliveryReco
return deliveryAdapter.deliver(msg, sync);
}
-const SemanticState::ConsumerImpl::shared_ptr SemanticState::find(const std::string& destination) const
-{
- ConsumerImpl::shared_ptr consumer;
- if (!find(destination, consumer)) {
- throw NotFoundException(QPID_MSG("Unknown destination " << destination << " session=" << session.getSessionId()));
- } else {
- return consumer;
- }
-}
-
-bool SemanticState::find(const std::string& destination, ConsumerImpl::shared_ptr& consumer) const
+SemanticState::ConsumerImpl& SemanticState::find(const std::string& destination)
{
- // @todo KAG gsim: shouldn't the consumers map be locked????
- ConsumerImplMap::const_iterator i = consumers.find(destination);
+ ConsumerImplMap::iterator i = consumers.find(destination);
if (i == consumers.end()) {
- return false;
+ throw NotFoundException(QPID_MSG("Unknown destination " << destination));
+ } else {
+ return *(i->second);
}
- consumer = i->second;
- return true;
}
void SemanticState::setWindowMode(const std::string& destination)
{
- find(destination)->setWindowMode();
+ find(destination).setWindowMode();
}
void SemanticState::setCreditMode(const std::string& destination)
{
- find(destination)->setCreditMode();
+ find(destination).setCreditMode();
}
void SemanticState::addByteCredit(const std::string& destination, uint32_t value)
{
- ConsumerImpl::shared_ptr c = find(destination);
- c->addByteCredit(value);
- c->requestDispatch();
+ ConsumerImpl& c = find(destination);
+ c.addByteCredit(value);
+ c.requestDispatch();
}
void SemanticState::addMessageCredit(const std::string& destination, uint32_t value)
{
- ConsumerImpl::shared_ptr c = find(destination);
- c->addMessageCredit(value);
- c->requestDispatch();
+ ConsumerImpl& c = find(destination);
+ c.addMessageCredit(value);
+ c.requestDispatch();
}
void SemanticState::flush(const std::string& destination)
{
- find(destination)->flush();
+ find(destination).flush();
}
void SemanticState::stop(const std::string& destination)
{
- find(destination)->stop();
+ find(destination).stop();
}
void SemanticState::ConsumerImpl::setWindowMode()
@@ -646,7 +621,6 @@ void SemanticState::ConsumerImpl::setCre
void SemanticState::ConsumerImpl::addByteCredit(uint32_t value)
{
assertClusterSafe();
- if (windowing) windowActive = true;
if (byteCredit != 0xFFFFFFFF) {
if (value == 0xFFFFFFFF) byteCredit = value;
else byteCredit += value;
@@ -656,7 +630,6 @@ void SemanticState::ConsumerImpl::addByt
void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value)
{
assertClusterSafe();
- if (windowing) windowActive = true;
if (msgCredit != 0xFFFFFFFF) {
if (value == 0xFFFFFFFF) msgCredit = value;
else msgCredit += value;
@@ -677,8 +650,7 @@ void SemanticState::ConsumerImpl::flush(
{
while(haveCredit() && queue->dispatch(shared_from_this()))
;
- msgCredit = 0;
- byteCredit = 0;
+ stop();
}
void SemanticState::ConsumerImpl::stop()
@@ -686,7 +658,6 @@ void SemanticState::ConsumerImpl::stop()
assertClusterSafe();
msgCredit = 0;
byteCredit = 0;
- windowActive = false;
}
Queue::shared_ptr SemanticState::getQueue(const string& name) const {
@@ -702,7 +673,7 @@ Queue::shared_ptr SemanticState::getQueu
}
AckRange SemanticState::findRange(DeliveryId first, DeliveryId last)
-{
+{
return DeliveryRecord::findRange(unacked, first, last);
}
@@ -720,21 +691,14 @@ void SemanticState::release(DeliveryId f
DeliveryRecords::reverse_iterator start(range.end);
DeliveryRecords::reverse_iterator end(range.start);
for_each(start, end, boost::bind(&DeliveryRecord::release, _1, setRedelivered));
-
- DeliveryRecords::iterator removed =
- remove_if(range.start, range.end, bind(&DeliveryRecord::isRedundant, _1));
- unacked.erase(removed, range.end);
}
void SemanticState::reject(DeliveryId first, DeliveryId last)
{
AckRange range = findRange(first, last);
for_each(range.start, range.end, mem_fun_ref(&DeliveryRecord::reject));
- //may need to remove the delivery records as well
- for (DeliveryRecords::iterator i = range.start; i != unacked.end() && i->getId() <= last; ) {
- if (i->isRedundant()) i = unacked.erase(i);
- else i++;
- }
+ //need to remove the delivery records as well
+ unacked.erase(range.start, range.end);
}
bool SemanticState::ConsumerImpl::doOutput()
@@ -797,13 +761,13 @@ void SemanticState::accepted(const Seque
//in transactional mode, don't dequeue or remove, just
//maintain set of acknowledged messages:
accumulatedAck.add(commands);
-
+
if (dtxBuffer.get()) {
//if enlisted in a dtx, copy the relevant slice from
//unacked and record it against that transaction
TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked));
accumulatedAck.clear();
- dtxBuffer->enlist(txAck);
+ dtxBuffer->enlist(txAck);
//mark the relevant messages as 'ended' in unacked
//if the messages are already completed, they can be
@@ -825,6 +789,7 @@ void SemanticState::accepted(const Seque
}
void SemanticState::completed(const SequenceSet& commands) {
+ assertClusterSafe();
DeliveryRecords::iterator removed =
remove_if(unacked.begin(), unacked.end(),
isInSequenceSetAnd(commands,
@@ -835,6 +800,7 @@ void SemanticState::completed(const Sequ
void SemanticState::attached()
{
+ assertClusterSafe();
for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
i->second->enableNotify();
session.getConnection().outputTasks.addOutputTask(i->second.get());
@@ -844,6 +810,7 @@ void SemanticState::attached()
void SemanticState::detached()
{
+ assertClusterSafe();
for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
i->second->disableNotify();
session.getConnection().outputTasks.removeOutputTask(i->second.get());
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/SemanticState.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/SemanticState.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/SemanticState.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/SemanticState.h Fri Oct 21 01:19:00 2011
@@ -65,7 +65,7 @@ class SessionContext;
*
* Message delivery is driven by ConsumerImpl::doOutput(), which is
* called when a client's socket is ready to write data.
- *
+ *
*/
class SemanticState : private boost::noncopyable {
public:
@@ -75,15 +75,14 @@ class SemanticState : private boost::non
{
mutable qpid::sys::Mutex lock;
SemanticState* const parent;
+ const std::string name;
const boost::shared_ptr<Queue> queue;
const bool ackExpected;
const bool acquire;
bool blocked;
bool windowing;
- bool windowActive;
bool exclusive;
std::string resumeId;
- const std::string tag; // <destination> from AMQP 0-10 Message.subscribe command
uint64_t resumeTtl;
framing::FieldTable arguments;
uint32_t msgCredit;
@@ -100,16 +99,15 @@ class SemanticState : private boost::non
public:
typedef boost::shared_ptr<ConsumerImpl> shared_ptr;
- ConsumerImpl(SemanticState* parent,
+ ConsumerImpl(SemanticState* parent,
const std::string& name, boost::shared_ptr<Queue> queue,
bool ack, bool acquire, bool exclusive,
- const std::string& tag, const std::string& resumeId,
- uint64_t resumeTtl, const framing::FieldTable& arguments);
+ const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments);
~ConsumerImpl();
OwnershipToken* getSession();
- bool deliver(QueuedMessage& msg);
- bool filter(boost::intrusive_ptr<Message> msg);
- bool accept(boost::intrusive_ptr<Message> msg);
+ bool deliver(QueuedMessage& msg);
+ bool filter(boost::intrusive_ptr<Message> msg);
+ bool accept(boost::intrusive_ptr<Message> msg);
void disableNotify();
void enableNotify();
@@ -124,13 +122,15 @@ class SemanticState : private boost::non
void addMessageCredit(uint32_t value);
void flush();
void stop();
- void complete(DeliveryRecord&);
+ void complete(DeliveryRecord&);
boost::shared_ptr<Queue> getQueue() const { return queue; }
bool isBlocked() const { return blocked; }
bool setBlocked(bool set) { std::swap(set, blocked); return set; }
bool doOutput();
+ std::string getName() const { return name; }
+
bool isAckExpected() const { return ackExpected; }
bool isAcquire() const { return acquire; }
bool isWindowing() const { return windowing; }
@@ -138,7 +138,6 @@ class SemanticState : private boost::non
uint32_t getMsgCredit() const { return msgCredit; }
uint32_t getByteCredit() const { return byteCredit; }
std::string getResumeId() const { return resumeId; };
- const std::string& getTag() const { return tag; }
uint64_t getResumeTtl() const { return resumeTtl; }
const framing::FieldTable& getArguments() const { return arguments; }
@@ -149,10 +148,9 @@ class SemanticState : private boost::non
management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);
};
- typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap;
-
private:
typedef std::map<std::string, ConsumerImpl::shared_ptr> ConsumerImplMap;
+ typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap;
SessionContext& session;
DeliveryAdapter& deliveryAdapter;
@@ -165,6 +163,7 @@ class SemanticState : private boost::non
DtxBufferMap suspendedXids;
framing::SequenceSet accumulatedAck;
boost::shared_ptr<Exchange> cacheExchange;
+ AclModule* acl;
const bool authMsg;
const std::string userID;
const std::string userName;
@@ -182,16 +181,14 @@ class SemanticState : private boost::non
void disable(ConsumerImpl::shared_ptr);
public:
-
SemanticState(DeliveryAdapter&, SessionContext&);
~SemanticState();
SessionContext& getSession() { return session; }
const SessionContext& getSession() const { return session; }
- const ConsumerImpl::shared_ptr find(const std::string& destination) const;
- bool find(const std::string& destination, ConsumerImpl::shared_ptr&) const;
-
+ ConsumerImpl& find(const std::string& destination);
+
/**
* Get named queue, never returns 0.
* @return: named queue
@@ -199,16 +196,16 @@ class SemanticState : private boost::non
* @exception: ConnectionException if name="" and session has no default.
*/
boost::shared_ptr<Queue> getQueue(const std::string& name) const;
-
+
bool exists(const std::string& consumerTag);
- void consume(const std::string& destination,
- boost::shared_ptr<Queue> queue,
+ void consume(const std::string& destination,
+ boost::shared_ptr<Queue> queue,
bool ackRequired, bool acquire, bool exclusive,
const std::string& resumeId=std::string(), uint64_t resumeTtl=0,
const framing::FieldTable& = framing::FieldTable());
- bool cancel(const std::string& tag);
+ void cancel(const std::string& tag);
void setWindowMode(const std::string& destination);
void setCreditMode(const std::string& destination);
@@ -221,13 +218,12 @@ class SemanticState : private boost::non
void commit(MessageStore* const store);
void rollback();
void selectDtx();
- bool getDtxSelected() const { return dtxSelected; }
void startDtx(const std::string& xid, DtxManager& mgr, bool join);
void endDtx(const std::string& xid, bool fail);
void suspendDtx(const std::string& xid);
void resumeDtx(const std::string& xid);
void recover(bool requeue);
- void deliver(DeliveryRecord& message, bool sync);
+ void deliver(DeliveryRecord& message, bool sync);
void acquire(DeliveryId first, DeliveryId last, DeliveryIds& acquired);
void release(DeliveryId first, DeliveryId last, bool setRedelivered);
void reject(DeliveryId first, DeliveryId last);
@@ -248,12 +244,9 @@ class SemanticState : private boost::non
DeliveryRecords& getUnacked() { return unacked; }
framing::SequenceSet getAccumulatedAck() const { return accumulatedAck; }
TxBuffer::shared_ptr getTxBuffer() const { return txBuffer; }
- DtxBuffer::shared_ptr getDtxBuffer() const { return dtxBuffer; }
void setTxBuffer(const TxBuffer::shared_ptr& txb) { txBuffer = txb; }
- void setDtxBuffer(const DtxBuffer::shared_ptr& dtxb) { dtxBuffer = dtxb; txBuffer = dtxb; }
void setAccumulatedAck(const framing::SequenceSet& s) { accumulatedAck = s; }
void record(const DeliveryRecord& delivery);
- DtxBufferMap& getSuspendedXids() { return suspendedXids; }
};
}} // namespace qpid::broker
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionAdapter.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionAdapter.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionAdapter.cpp Fri Oct 21 01:19:00 2011
@@ -24,7 +24,6 @@
#include "qpid/log/Statement.h"
#include "qpid/framing/SequenceSet.h"
#include "qpid/management/ManagementAgent.h"
-#include "qpid/broker/SessionState.h"
#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h"
@@ -65,56 +64,53 @@ void SessionAdapter::ExchangeHandlerImpl
const string& alternateExchange,
bool passive, bool durable, bool /*autoDelete*/, const FieldTable& args){
+ AclModule* acl = getBroker().getAcl();
+ if (acl) {
+ std::map<acl::Property, std::string> params;
+ params.insert(make_pair(acl::PROP_TYPE, type));
+ params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
+ params.insert(make_pair(acl::PROP_PASSIVE, std::string(passive ? _TRUE : _FALSE) ));
+ params.insert(make_pair(acl::PROP_DURABLE, std::string(durable ? _TRUE : _FALSE)));
+ if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_EXCHANGE,exchange,¶ms) )
+ throw UnauthorizedAccessException(QPID_MSG("ACL denied exchange declare request from " << getConnection().getUserId()));
+ }
+
//TODO: implement autoDelete
Exchange::shared_ptr alternate;
if (!alternateExchange.empty()) {
alternate = getBroker().getExchanges().get(alternateExchange);
}
if(passive){
- AclModule* acl = getBroker().getAcl();
- if (acl) {
- //TODO: why does a passive declare require create
- //permission? The purpose of the passive flag is to state
- //that the exchange should *not* created. For
- //authorisation a passive declare is similar to
- //exchange-query.
- std::map<acl::Property, std::string> params;
- params.insert(make_pair(acl::PROP_TYPE, type));
- params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
- params.insert(make_pair(acl::PROP_PASSIVE, _TRUE));
- params.insert(make_pair(acl::PROP_DURABLE, durable ? _TRUE : _FALSE));
- if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_EXCHANGE,exchange,¶ms) )
- throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange create request from " << getConnection().getUserId()));
- }
Exchange::shared_ptr actual(getBroker().getExchanges().get(exchange));
checkType(actual, type);
checkAlternate(actual, alternate);
- }else{
+ }else{
if(exchange.find("amq.") == 0 || exchange.find("qpid.") == 0) {
throw framing::NotAllowedException(QPID_MSG("Exchange names beginning with \"amq.\" or \"qpid.\" are reserved. (exchange=\"" << exchange << "\")"));
}
try{
- std::pair<Exchange::shared_ptr, bool> response =
- getBroker().createExchange(exchange, type, durable, alternateExchange, args,
- getConnection().getUserId(), getConnection().getUrl());
- if (!response.second) {
- //exchange already there, not created
+ std::pair<Exchange::shared_ptr, bool> response = getBroker().getExchanges().declare(exchange, type, durable, args);
+ if (response.second) {
+ if (alternate) {
+ response.first->setAlternate(alternate);
+ alternate->incAlternateUsers();
+ }
+ if (durable) {
+ getBroker().getStore().create(*response.first, args);
+ }
+ } else {
checkType(response.first, type);
checkAlternate(response.first, alternate);
- ManagementAgent* agent = getBroker().getManagementAgent();
- if (agent)
- agent->raiseEvent(_qmf::EventExchangeDeclare(getConnection().getUrl(),
- getConnection().getUserId(),
- exchange,
- type,
- alternateExchange,
- durable,
- false,
- ManagementAgent::toMap(args),
- "existing"));
}
+
+ ManagementAgent* agent = getBroker().getManagementAgent();
+ if (agent)
+ agent->raiseEvent(_qmf::EventExchangeDeclare(getConnection().getUrl(), getConnection().getUserId(), exchange, type,
+ alternateExchange, durable, false, ManagementAgent::toMap(args),
+ response.second ? "created" : "existing"));
+
}catch(UnknownExchangeTypeException& /*e*/){
- throw NotFoundException(QPID_MSG("Exchange type not implemented: " << type));
+ throw CommandInvalidException(QPID_MSG("Exchange type not implemented: " << type));
}
}
}
@@ -138,8 +134,22 @@ void SessionAdapter::ExchangeHandlerImpl
void SessionAdapter::ExchangeHandlerImpl::delete_(const string& name, bool /*ifUnused*/)
{
- //TODO: implement if-unused
- getBroker().deleteExchange(name, getConnection().getUserId(), getConnection().getUrl());
+ AclModule* acl = getBroker().getAcl();
+ if (acl) {
+ if (!acl->authorise(getConnection().getUserId(),acl::ACT_DELETE,acl::OBJ_EXCHANGE,name,NULL) )
+ throw UnauthorizedAccessException(QPID_MSG("ACL denied exchange delete request from " << getConnection().getUserId()));
+ }
+
+ //TODO: implement unused
+ Exchange::shared_ptr exchange(getBroker().getExchanges().get(name));
+ if (exchange->inUseAsAlternate()) throw NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange."));
+ if (exchange->isDurable()) getBroker().getStore().destroy(*exchange);
+ if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers();
+ getBroker().getExchanges().destroy(name);
+
+ ManagementAgent* agent = getBroker().getManagementAgent();
+ if (agent)
+ agent->raiseEvent(_qmf::EventExchangeDelete(getConnection().getUrl(), getConnection().getUserId(), name));
}
ExchangeQueryResult SessionAdapter::ExchangeHandlerImpl::query(const string& name)
@@ -159,19 +169,67 @@ ExchangeQueryResult SessionAdapter::Exch
}
void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName,
- const string& exchangeName, const string& routingKey,
- const FieldTable& arguments)
+ const string& exchangeName, const string& routingKey,
+ const FieldTable& arguments)
{
- getBroker().bind(queueName, exchangeName, routingKey, arguments,
- getConnection().getUserId(), getConnection().getUrl());
+ AclModule* acl = getBroker().getAcl();
+ if (acl) {
+ std::map<acl::Property, std::string> params;
+ params.insert(make_pair(acl::PROP_QUEUENAME, queueName));
+ params.insert(make_pair(acl::PROP_ROUTINGKEY, routingKey));
+
+ if (!acl->authorise(getConnection().getUserId(),acl::ACT_BIND,acl::OBJ_EXCHANGE,exchangeName,¶ms))
+ throw UnauthorizedAccessException(QPID_MSG("ACL denied exchange bind request from " << getConnection().getUserId()));
+ }
+
+ Queue::shared_ptr queue = getQueue(queueName);
+ Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName);
+ if(exchange){
+ string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey;
+ if (exchange->bind(queue, exchangeRoutingKey, &arguments)) {
+ queue->bound(exchangeName, routingKey, arguments);
+ if (exchange->isDurable() && queue->isDurable()) {
+ getBroker().getStore().bind(*exchange, *queue, routingKey, arguments);
+ }
+
+ ManagementAgent* agent = getBroker().getManagementAgent();
+ if (agent)
+ agent->raiseEvent(_qmf::EventBind(getConnection().getUrl(), getConnection().getUserId(), exchangeName,
+ queueName, exchangeRoutingKey, ManagementAgent::toMap(arguments)));
+ }
+ }else{
+ throw NotFoundException("Bind failed. No such exchange: " + exchangeName);
+ }
}
void SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName,
const string& exchangeName,
const string& routingKey)
{
- getBroker().unbind(queueName, exchangeName, routingKey,
- getConnection().getUserId(), getConnection().getUrl());
+ AclModule* acl = getBroker().getAcl();
+ if (acl) {
+ std::map<acl::Property, std::string> params;
+ params.insert(make_pair(acl::PROP_QUEUENAME, queueName));
+ params.insert(make_pair(acl::PROP_ROUTINGKEY, routingKey));
+ if (!acl->authorise(getConnection().getUserId(),acl::ACT_UNBIND,acl::OBJ_EXCHANGE,exchangeName,¶ms) )
+ throw UnauthorizedAccessException(QPID_MSG("ACL denied exchange unbind request from " << getConnection().getUserId()));
+ }
+
+ Queue::shared_ptr queue = getQueue(queueName);
+ if (!queue.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName);
+
+ Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName);
+ if (!exchange.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName);
+
+ //TODO: revise unbind to rely solely on binding key (not args)
+ if (exchange->unbind(queue, routingKey, 0)) {
+ if (exchange->isDurable() && queue->isDurable())
+ getBroker().getStore().unbind(*exchange, *queue, routingKey, FieldTable());
+
+ ManagementAgent* agent = getBroker().getManagementAgent();
+ if (agent)
+ agent->raiseEvent(_qmf::EventUnbind(getConnection().getUrl(), getConnection().getUserId(), exchangeName, queueName, routingKey));
+ }
}
ExchangeBoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::string& exchangeName,
@@ -274,42 +332,52 @@ QueueQueryResult SessionAdapter::QueueHa
void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& alternateExchange,
bool passive, bool durable, bool exclusive,
bool autoDelete, const qpid::framing::FieldTable& arguments)
-{
+{
+ AclModule* acl = getBroker().getAcl();
+ if (acl) {
+ std::map<acl::Property, std::string> params;
+ params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
+ params.insert(make_pair(acl::PROP_PASSIVE, std::string(passive ? _TRUE : _FALSE) ));
+ params.insert(make_pair(acl::PROP_DURABLE, std::string(durable ? _TRUE : _FALSE)));
+ params.insert(make_pair(acl::PROP_EXCLUSIVE, std::string(exclusive ? _TRUE : _FALSE)));
+ params.insert(make_pair(acl::PROP_AUTODELETE, std::string(autoDelete ? _TRUE : _FALSE)));
+ params.insert(make_pair(acl::PROP_POLICYTYPE, arguments.getAsString("qpid.policy_type")));
+ params.insert(make_pair(acl::PROP_MAXQUEUECOUNT, boost::lexical_cast<string>(arguments.getAsInt("qpid.max_count"))));
+ params.insert(make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast<string>(arguments.getAsInt64("qpid.max_size"))));
+
+ if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_QUEUE,name,¶ms) )
+ throw UnauthorizedAccessException(QPID_MSG("ACL denied queue create request from " << getConnection().getUserId()));
+ }
+
+ Exchange::shared_ptr alternate;
+ if (!alternateExchange.empty()) {
+ alternate = getBroker().getExchanges().get(alternateExchange);
+ }
Queue::shared_ptr queue;
if (passive && !name.empty()) {
- AclModule* acl = getBroker().getAcl();
- if (acl) {
- //TODO: why does a passive declare require create
- //permission? The purpose of the passive flag is to state
- //that the queue should *not* created. For
- //authorisation a passive declare is similar to
- //queue-query (or indeed a qmf query).
- std::map<acl::Property, std::string> params;
- params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
- params.insert(make_pair(acl::PROP_PASSIVE, _TRUE));
- params.insert(make_pair(acl::PROP_DURABLE, std::string(durable ? _TRUE : _FALSE)));
- params.insert(make_pair(acl::PROP_EXCLUSIVE, std::string(exclusive ? _TRUE : _FALSE)));
- params.insert(make_pair(acl::PROP_AUTODELETE, std::string(autoDelete ? _TRUE : _FALSE)));
- params.insert(make_pair(acl::PROP_POLICYTYPE, arguments.getAsString("qpid.policy_type")));
- params.insert(make_pair(acl::PROP_MAXQUEUECOUNT, boost::lexical_cast<string>(arguments.getAsInt("qpid.max_count"))));
- params.insert(make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast<string>(arguments.getAsInt64("qpid.max_size"))));
- if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_QUEUE,name,¶ms) )
- throw UnauthorizedAccessException(QPID_MSG("ACL denied queue create request from " << getConnection().getUserId()));
- }
- queue = getQueue(name);
+ queue = getQueue(name);
//TODO: check alternate-exchange is as expected
} else {
- std::pair<Queue::shared_ptr, bool> queue_created =
- getBroker().createQueue(name, durable,
- autoDelete,
- exclusive ? &session : 0,
- alternateExchange,
- arguments,
- getConnection().getUserId(),
- getConnection().getUrl());
+ std::pair<Queue::shared_ptr, bool> queue_created =
+ getBroker().getQueues().declare(name, durable,
+ autoDelete,
+ exclusive ? &session : 0);
queue = queue_created.first;
assert(queue);
if (queue_created.second) { // This is a new queue
+ if (alternate) {
+ queue->setAlternateExchange(alternate);
+ alternate->incAlternateUsers();
+ }
+
+ //apply settings & create persistent record if required
+ try { queue_created.first->create(arguments); }
+ catch (...) { getBroker().getQueues().destroy(name); throw; }
+
+ //add default binding:
+ getBroker().getExchanges().getDefault()->bind(queue, name, 0);
+ queue->bound(getBroker().getExchanges().getDefault()->getName(), name, arguments);
+
//handle automatic cleanup:
if (exclusive) {
exclusiveQueues.push_back(queue);
@@ -318,20 +386,21 @@ void SessionAdapter::QueueHandlerImpl::d
if (exclusive && queue->setExclusiveOwner(&session)) {
exclusiveQueues.push_back(queue);
}
- ManagementAgent* agent = getBroker().getManagementAgent();
- if (agent)
- agent->raiseEvent(_qmf::EventQueueDeclare(getConnection().getUrl(), getConnection().getUserId(),
- name, durable, exclusive, autoDelete, ManagementAgent::toMap(arguments),
- "existing"));
}
+ ManagementAgent* agent = getBroker().getManagementAgent();
+ if (agent)
+ agent->raiseEvent(_qmf::EventQueueDeclare(getConnection().getUrl(), getConnection().getUserId(),
+ name, durable, exclusive, autoDelete, ManagementAgent::toMap(arguments),
+ queue_created.second ? "created" : "existing"));
}
- if (exclusive && !queue->isExclusiveOwner(&session))
+ if (exclusive && !queue->isExclusiveOwner(&session))
throw ResourceLockedException(QPID_MSG("Cannot grant exclusive access to queue "
<< queue->getName()));
-}
-
+}
+
+
void SessionAdapter::QueueHandlerImpl::purge(const string& queue){
AclModule* acl = getBroker().getAcl();
if (acl)
@@ -340,32 +409,40 @@ void SessionAdapter::QueueHandlerImpl::p
throw UnauthorizedAccessException(QPID_MSG("ACL denied queue purge request from " << getConnection().getUserId()));
}
getQueue(queue)->purge();
-}
+}
+
+void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnused, bool ifEmpty){
-void SessionAdapter::QueueHandlerImpl::checkDelete(Queue::shared_ptr queue, bool ifUnused, bool ifEmpty)
-{
- if (queue->hasExclusiveOwner() && !queue->isExclusiveOwner(&session)) {
+ AclModule* acl = getBroker().getAcl();
+ if (acl)
+ {
+ if (!acl->authorise(getConnection().getUserId(),acl::ACT_DELETE,acl::OBJ_QUEUE,queue,NULL) )
+ throw UnauthorizedAccessException(QPID_MSG("ACL denied queue delete request from " << getConnection().getUserId()));
+ }
+
+ Queue::shared_ptr q = getQueue(queue);
+ if (q->hasExclusiveOwner() && !q->isExclusiveOwner(&session))
throw ResourceLockedException(QPID_MSG("Cannot delete queue "
- << queue->getName() << "; it is exclusive to another session"));
- } else if(ifEmpty && queue->getMessageCount() > 0) {
- throw PreconditionFailedException(QPID_MSG("Cannot delete queue "
- << queue->getName() << "; queue not empty"));
- } else if(ifUnused && queue->getConsumerCount() > 0) {
- throw PreconditionFailedException(QPID_MSG("Cannot delete queue "
- << queue->getName() << "; queue in use"));
- } else if (queue->isExclusiveOwner(&session)) {
+ << queue << "; it is exclusive to another session"));
+ if(ifEmpty && q->getMessageCount() > 0){
+ throw PreconditionFailedException("Queue not empty.");
+ }else if(ifUnused && q->getConsumerCount() > 0){
+ throw PreconditionFailedException("Queue in use.");
+ }else{
//remove the queue from the list of exclusive queues if necessary
- QueueVector::iterator i = std::find(exclusiveQueues.begin(),
- exclusiveQueues.end(),
- queue);
- if (i < exclusiveQueues.end()) exclusiveQueues.erase(i);
- }
-}
-
-void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnused, bool ifEmpty)
-{
- getBroker().deleteQueue(queue, getConnection().getUserId(), getConnection().getUrl(),
- boost::bind(&SessionAdapter::QueueHandlerImpl::checkDelete, this, _1, ifUnused, ifEmpty));
+ if(q->isExclusiveOwner(&getConnection())){
+ QueueVector::iterator i = std::find(getConnection().exclusiveQueues.begin(), getConnection().exclusiveQueues.end(), q);
+ if(i < getConnection().exclusiveQueues.end()) getConnection().exclusiveQueues.erase(i);
+ }
+ q->destroy();
+ getBroker().getQueues().destroy(queue);
+ q->unbind(getBroker().getExchanges(), q);
+
+ ManagementAgent* agent = getBroker().getManagementAgent();
+ if (agent)
+ agent->raiseEvent(_qmf::EventQueueDelete(getConnection().getUrl(), getConnection().getUserId(), queue));
+ q->notifyDeleted();
+ }
}
SessionAdapter::MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) :
@@ -431,9 +508,7 @@ SessionAdapter::MessageHandlerImpl::subs
void
SessionAdapter::MessageHandlerImpl::cancel(const string& destination )
{
- if (!state.cancel(destination)) {
- throw NotFoundException(QPID_MSG("No such subscription: " << destination));
- }
+ state.cancel(destination);
ManagementAgent* agent = getBroker().getManagementAgent();
if (agent)
@@ -512,12 +587,7 @@ framing::MessageResumeResult SessionAdap
-void SessionAdapter::ExecutionHandlerImpl::sync()
-{
- session.addPendingExecutionSync();
- /** @todo KAG - need a generic mechanism to allow a command to returning "not completed" status back to SessionState */
-
-}
+void SessionAdapter::ExecutionHandlerImpl::sync() {} //essentially a no-op
void SessionAdapter::ExecutionHandlerImpl::result(const SequenceNumber& /*commandId*/, const string& /*value*/)
{
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionAdapter.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionAdapter.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionAdapter.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionAdapter.h Fri Oct 21 01:19:00 2011
@@ -138,7 +138,6 @@ class Queue;
bool isLocal(const ConnectionToken* t) const;
void destroyExclusiveQueues();
- void checkDelete(boost::shared_ptr<Queue> queue, bool ifUnused, bool ifEmpty);
template <class F> void eachExclusiveQueue(F f)
{
std::for_each(exclusiveQueues.begin(), exclusiveQueues.end(), f);
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionContext.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionContext.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionContext.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionContext.h Fri Oct 21 01:19:00 2011
@@ -46,7 +46,6 @@ class SessionContext : public OwnershipT
virtual Broker& getBroker() = 0;
virtual uint16_t getChannel() const = 0;
virtual const SessionId& getSessionId() const = 0;
- virtual void addPendingExecutionSync() = 0;
};
}} // namespace qpid::broker
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionHandler.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionHandler.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionHandler.cpp Fri Oct 21 01:19:00 2011
@@ -40,6 +40,11 @@ SessionHandler::SessionHandler(Connectio
SessionHandler::~SessionHandler() {}
+namespace {
+ClassId classId(AMQMethodBody* m) { return m ? m->amqpMethodId() : 0; }
+MethodId methodId(AMQMethodBody* m) { return m ? m->amqpClassId() : 0; }
+} // namespace
+
void SessionHandler::connectionException(framing::connection::CloseCode code, const std::string& msg) {
// NOTE: must tell the error listener _before_ calling connection.close()
if (connection.getErrorListener()) connection.getErrorListener()->connectionError(msg);
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionState.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionState.cpp Fri Oct 21 01:19:00 2011
@@ -25,7 +25,6 @@
#include "qpid/broker/SessionManager.h"
#include "qpid/broker/SessionHandler.h"
#include "qpid/broker/RateFlowcontrol.h"
-#include "qpid/sys/ClusterSafe.h"
#include "qpid/sys/Timer.h"
#include "qpid/framing/AMQContentBody.h"
#include "qpid/framing/AMQHeaderBody.h"
@@ -61,9 +60,9 @@ SessionState::SessionState(
semanticState(*this, *this),
adapter(semanticState),
msgBuilder(&broker.getStore()),
+ enqueuedOp(boost::bind(&SessionState::enqueued, this, _1)),
mgmtObject(0),
- rateFlowcontrol(0),
- asyncCommandCompleter(new AsyncCommandCompleter(this))
+ rateFlowcontrol(0)
{
uint32_t maxRate = broker.getOptions().maxSessionRate;
if (maxRate) {
@@ -96,7 +95,6 @@ void SessionState::addManagementObject()
}
SessionState::~SessionState() {
- asyncCommandCompleter->cancel();
semanticState.closed();
if (mgmtObject != 0)
mgmtObject->resourceDestroy ();
@@ -127,7 +125,6 @@ bool SessionState::isLocal(const Connect
void SessionState::detach() {
QPID_LOG(debug, getId() << ": detached on broker.");
- asyncCommandCompleter->detached();
disableOutput();
handler = 0;
if (mgmtObject != 0)
@@ -148,7 +145,6 @@ void SessionState::attach(SessionHandler
mgmtObject->set_connectionRef (h.getConnection().GetManagementObject()->getObjectId());
mgmtObject->set_channelId (h.getChannel());
}
- asyncCommandCompleter->attached();
}
void SessionState::abort() {
@@ -206,17 +202,15 @@ Manageable::status_t SessionState::Manag
}
void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceNumber& id) {
- currentCommandComplete = true; // assumed, can be overridden by invoker method (this sucks).
Invoker::Result invocation = invoke(adapter, *method);
- if (currentCommandComplete) receiverCompleted(id);
-
+ receiverCompleted(id);
if (!invocation.wasHandled()) {
throw NotImplementedException(QPID_MSG("Not implemented: " << *method));
} else if (invocation.hasResult()) {
getProxy().getExecution().result(id, invocation.getResult());
}
-
- if (method->isSync() && currentCommandComplete) {
+ if (method->isSync()) {
+ incomplete.process(enqueuedOp, true);
sendAcceptAndCompletion();
}
}
@@ -259,14 +253,23 @@ void SessionState::handleContent(AMQFram
header.setEof(false);
msg->getFrames().append(header);
}
- if (broker.isTimestamping())
- msg->setTimestamp();
msg->setPublisher(&getConnection());
- msg->getIngressCompletion().begin();
semanticState.handle(msg);
msgBuilder.end();
- IncompleteIngressMsgXfer xfer(this, msg);
- msg->getIngressCompletion().end(xfer); // allows msg to complete xfer
+
+ if (msg->isEnqueueComplete()) {
+ enqueued(msg);
+ } else {
+ incomplete.add(msg);
+ }
+
+ //hold up execution until async enqueue is complete
+ if (msg->getFrames().getMethod()->isSync()) {
+ incomplete.process(enqueuedOp, true);
+ sendAcceptAndCompletion();
+ } else {
+ incomplete.process(enqueuedOp, false);
+ }
}
// Handle producer session flow control
@@ -316,41 +319,11 @@ void SessionState::sendAcceptAndCompleti
sendCompletion();
}
-/** Invoked when the given inbound message is finished being processed
- * by all interested parties (eg. it is done being enqueued to all queues,
- * its credit has been accounted for, etc). At this point, msg is considered
- * by this receiver as 'completed' (as defined by AMQP 0_10)
- */
-void SessionState::completeRcvMsg(SequenceNumber id,
- bool requiresAccept,
- bool requiresSync)
+void SessionState::enqueued(boost::intrusive_ptr<Message> msg)
{
- // Mark this as a cluster-unsafe scope since it can be called in
- // journal threads or connection threads as part of asynchronous
- // command completion.
- sys::ClusterUnsafeScope cus;
-
- bool callSendCompletion = false;
- receiverCompleted(id);
- if (requiresAccept)
- // will cause msg's seq to appear in the next message.accept we send.
- accepted.add(id);
-
- // Are there any outstanding Execution.Sync commands pending the
- // completion of this msg? If so, complete them.
- while (!pendingExecutionSyncs.empty() &&
- receiverGetIncomplete().front() >= pendingExecutionSyncs.front()) {
- const SequenceNumber id = pendingExecutionSyncs.front();
- pendingExecutionSyncs.pop();
- QPID_LOG(debug, getId() << ": delayed execution.sync " << id << " is completed.");
- receiverCompleted(id);
- callSendCompletion = true; // likely peer is pending for this completion.
- }
-
- // if the sender has requested immediate notification of the completion...
- if (requiresSync || callSendCompletion) {
- sendAcceptAndCompletion();
- }
+ receiverCompleted(msg->getCommandId());
+ if (msg->requiresAccept())
+ accepted.add(msg->getCommandId());
}
void SessionState::handleIn(AMQFrame& frame) {
@@ -423,176 +396,4 @@ framing::AMQP_ClientProxy& SessionState:
return handler->getClusterOrderProxy();
}
-
-// Current received command is an execution.sync command.
-// Complete this command only when all preceding commands have completed.
-// (called via the invoker() in handleCommand() above)
-void SessionState::addPendingExecutionSync()
-{
- SequenceNumber syncCommandId = receiverGetCurrent();
- if (receiverGetIncomplete().front() < syncCommandId) {
- currentCommandComplete = false;
- pendingExecutionSyncs.push(syncCommandId);
- asyncCommandCompleter->flushPendingMessages();
- QPID_LOG(debug, getId() << ": delaying completion of execution.sync " << syncCommandId);
- }
-}
-
-
-/** factory for creating a reference-counted IncompleteIngressMsgXfer object
- * which will be attached to a message that will be completed asynchronously.
- */
-boost::intrusive_ptr<AsyncCompletion::Callback>
-SessionState::IncompleteIngressMsgXfer::clone()
-{
- // Optimization: this routine is *only* invoked when the message needs to be asynchronously completed.
- // If the client is pending the message.transfer completion, flush now to force immediate write to journal.
- if (requiresSync)
- msg->flush();
- else {
- // otherwise, we need to track this message in order to flush it if an execution.sync arrives
- // before it has been completed (see flushPendingMessages())
- pending = true;
- completerContext->addPendingMessage(msg);
- }
-
- return boost::intrusive_ptr<SessionState::IncompleteIngressMsgXfer>(new SessionState::IncompleteIngressMsgXfer(*this));
-}
-
-
-/** Invoked by the asynchronous completer associated with a received
- * msg that is pending Completion. May be invoked by the IO thread
- * (sync == true), or some external thread (!sync).
- */
-void SessionState::IncompleteIngressMsgXfer::completed(bool sync)
-{
- if (pending) completerContext->deletePendingMessage(id);
- if (!sync) {
- /** note well: this path may execute in any thread. It is safe to access
- * the scheduledCompleterContext, since *this has a shared pointer to it.
- * but not session!
- */
- session = 0;
- QPID_LOG(debug, ": async completion callback scheduled for msg seq=" << id);
- completerContext->scheduleMsgCompletion(id, requiresAccept, requiresSync);
- } else {
- // this path runs directly from the ac->end() call in handleContent() above,
- // so *session is definately valid.
- if (session->isAttached()) {
- QPID_LOG(debug, ": receive completed for msg seq=" << id);
- session->completeRcvMsg(id, requiresAccept, requiresSync);
- }
- }
- completerContext = boost::intrusive_ptr<AsyncCommandCompleter>();
-}
-
-
-/** Scheduled from an asynchronous command's completed callback to run on
- * the IO thread.
- */
-void SessionState::AsyncCommandCompleter::schedule(boost::intrusive_ptr<AsyncCommandCompleter> ctxt)
-{
- ctxt->completeCommands();
-}
-
-
-/** Track an ingress message that is pending completion */
-void SessionState::AsyncCommandCompleter::addPendingMessage(boost::intrusive_ptr<Message> msg)
-{
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
- std::pair<SequenceNumber, boost::intrusive_ptr<Message> > item(msg->getCommandId(), msg);
- bool unique = pendingMsgs.insert(item).second;
- if (!unique) {
- assert(false);
- }
-}
-
-
-/** pending message has completed */
-void SessionState::AsyncCommandCompleter::deletePendingMessage(SequenceNumber id)
-{
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
- pendingMsgs.erase(id);
-}
-
-
-/** done when an execution.sync arrives */
-void SessionState::AsyncCommandCompleter::flushPendingMessages()
-{
- std::map<SequenceNumber, boost::intrusive_ptr<Message> > copy;
- {
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
- pendingMsgs.swap(copy); // we've only tracked these in case a flush is needed, so nuke 'em now.
- }
- // drop lock, so it is safe to call "flush()"
- for (std::map<SequenceNumber, boost::intrusive_ptr<Message> >::iterator i = copy.begin();
- i != copy.end(); ++i) {
- i->second->flush();
- }
-}
-
-
-/** mark an ingress Message.Transfer command as completed.
- * This method must be thread safe - it may run on any thread.
- */
-void SessionState::AsyncCommandCompleter::scheduleMsgCompletion(SequenceNumber cmd,
- bool requiresAccept,
- bool requiresSync)
-{
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
-
- if (session && isAttached) {
- MessageInfo msg(cmd, requiresAccept, requiresSync);
- completedMsgs.push_back(msg);
- if (completedMsgs.size() == 1) {
- session->getConnection().requestIOProcessing(boost::bind(&schedule,
- session->asyncCommandCompleter));
- }
- }
-}
-
-
-/** Cause the session to complete all completed commands.
- * Executes on the IO thread.
- */
-void SessionState::AsyncCommandCompleter::completeCommands()
-{
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
-
- // when session is destroyed, it clears the session pointer via cancel().
- if (session && session->isAttached()) {
- for (std::vector<MessageInfo>::iterator msg = completedMsgs.begin();
- msg != completedMsgs.end(); ++msg) {
- session->completeRcvMsg(msg->cmd, msg->requiresAccept, msg->requiresSync);
- }
- }
- completedMsgs.clear();
-}
-
-
-/** cancel any pending calls to scheduleComplete */
-void SessionState::AsyncCommandCompleter::cancel()
-{
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
- session = 0;
-}
-
-
-/** inform the completer that the session has attached,
- * allows command completion scheduling from any thread */
-void SessionState::AsyncCommandCompleter::attached()
-{
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
- isAttached = true;
-}
-
-
-/** inform the completer that the session has detached,
- * disables command completion scheduling from any thread */
-void SessionState::AsyncCommandCompleter::detached()
-{
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
- isAttached = false;
-}
-
}} // namespace qpid::broker
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionState.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionState.h Fri Oct 21 01:19:00 2011
@@ -30,15 +30,13 @@
#include "qmf/org/apache/qpid/broker/Session.h"
#include "qpid/broker/SessionAdapter.h"
#include "qpid/broker/DeliveryAdapter.h"
-#include "qpid/broker/AsyncCompletion.h"
+#include "qpid/broker/IncompleteMessageList.h"
#include "qpid/broker/MessageBuilder.h"
#include "qpid/broker/SessionContext.h"
#include "qpid/broker/SemanticState.h"
-#include "qpid/sys/Monitor.h"
#include <boost/noncopyable.hpp>
#include <boost/scoped_ptr.hpp>
-#include <boost/intrusive_ptr.hpp>
#include <set>
#include <vector>
@@ -125,10 +123,6 @@ class SessionState : public qpid::Sessio
const SessionId& getSessionId() const { return getId(); }
- // Used by ExecutionHandler sync command processing. Notifies
- // the SessionState of a received Execution.Sync command.
- void addPendingExecutionSync();
-
// Used to delay creation of management object for sessions
// belonging to inter-broker bridges
void addManagementObject();
@@ -136,10 +130,7 @@ class SessionState : public qpid::Sessio
private:
void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber& id);
void handleContent(framing::AMQFrame& frame, const framing::SequenceNumber& id);
-
- // indicate that the given ingress msg has been completely received by the
- // broker, and the msg's message.transfer command can be considered completed.
- void completeRcvMsg(SequenceNumber id, bool requiresAccept, bool requiresSync);
+ void enqueued(boost::intrusive_ptr<Message> msg);
void handleIn(framing::AMQFrame& frame);
void handleOut(framing::AMQFrame& frame);
@@ -165,6 +156,8 @@ class SessionState : public qpid::Sessio
SemanticState semanticState;
SessionAdapter adapter;
MessageBuilder msgBuilder;
+ IncompleteMessageList incomplete;
+ IncompleteMessageList::CompletionListener enqueuedOp;
qmf::org::apache::qpid::broker::Session* mgmtObject;
qpid::framing::SequenceSet accepted;
@@ -173,110 +166,6 @@ class SessionState : public qpid::Sessio
boost::scoped_ptr<RateFlowcontrol> rateFlowcontrol;
boost::intrusive_ptr<sys::TimerTask> flowControlTimer;
- // sequence numbers for pending received Execution.Sync commands
- std::queue<SequenceNumber> pendingExecutionSyncs;
- bool currentCommandComplete;
-
- /** This class provides a context for completing asynchronous commands in a thread
- * safe manner. Asynchronous commands save their completion state in this class.
- * This class then schedules the completeCommands() method in the IO thread.
- * While running in the IO thread, completeCommands() may safely complete all
- * saved commands without the risk of colliding with other operations on this
- * SessionState.
- */
- class AsyncCommandCompleter : public RefCounted {
- private:
- SessionState *session;
- bool isAttached;
- qpid::sys::Mutex completerLock;
-
- // special-case message.transfer commands for optimization
- struct MessageInfo {
- SequenceNumber cmd; // message.transfer command id
- bool requiresAccept;
- bool requiresSync;
- MessageInfo(SequenceNumber c, bool a, bool s)
- : cmd(c), requiresAccept(a), requiresSync(s) {}
- };
- std::vector<MessageInfo> completedMsgs;
- // If an ingress message does not require a Sync, we need to
- // hold a reference to it in case an Execution.Sync command is received and we
- // have to manually flush the message.
- std::map<SequenceNumber, boost::intrusive_ptr<Message> > pendingMsgs;
-
- /** complete all pending commands, runs in IO thread */
- void completeCommands();
-
- /** for scheduling a run of "completeCommands()" on the IO thread */
- static void schedule(boost::intrusive_ptr<AsyncCommandCompleter>);
-
- public:
- AsyncCommandCompleter(SessionState *s) : session(s), isAttached(s->isAttached()) {};
- ~AsyncCommandCompleter() {};
-
- /** track a message pending ingress completion */
- void addPendingMessage(boost::intrusive_ptr<Message> m);
- void deletePendingMessage(SequenceNumber id);
- void flushPendingMessages();
- /** schedule the processing of a completed ingress message.transfer command */
- void scheduleMsgCompletion(SequenceNumber cmd,
- bool requiresAccept,
- bool requiresSync);
- void cancel(); // called by SessionState destructor.
- void attached(); // called by SessionState on attach()
- void detached(); // called by SessionState on detach()
- };
- boost::intrusive_ptr<AsyncCommandCompleter> asyncCommandCompleter;
-
- /** Abstract class that represents a single asynchronous command that is
- * pending completion.
- */
- class AsyncCommandContext : public AsyncCompletion::Callback
- {
- public:
- AsyncCommandContext( SessionState *ss, SequenceNumber _id )
- : id(_id), completerContext(ss->asyncCommandCompleter) {}
- virtual ~AsyncCommandContext() {}
-
- protected:
- SequenceNumber id;
- boost::intrusive_ptr<AsyncCommandCompleter> completerContext;
- };
-
- /** incomplete Message.transfer commands - inbound to broker from client
- */
- class IncompleteIngressMsgXfer : public SessionState::AsyncCommandContext
- {
- public:
- IncompleteIngressMsgXfer( SessionState *ss,
- boost::intrusive_ptr<Message> m )
- : AsyncCommandContext(ss, m->getCommandId()),
- session(ss),
- msg(m),
- requiresAccept(m->requiresAccept()),
- requiresSync(m->getFrames().getMethod()->isSync()),
- pending(false) {}
- IncompleteIngressMsgXfer( const IncompleteIngressMsgXfer& x )
- : AsyncCommandContext(x.session, x.msg->getCommandId()),
- session(x.session),
- msg(x.msg),
- requiresAccept(x.requiresAccept),
- requiresSync(x.requiresSync),
- pending(x.pending) {}
-
- virtual ~IncompleteIngressMsgXfer() {};
-
- virtual void completed(bool);
- virtual boost::intrusive_ptr<AsyncCompletion::Callback> clone();
-
- private:
- SessionState *session; // only valid if sync flag in callback is true
- boost::intrusive_ptr<Message> msg;
- bool requiresAccept;
- bool requiresSync;
- bool pending; // true if msg saved on pending list...
- };
-
friend class SessionManager;
};
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/ThresholdAlerts.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/ThresholdAlerts.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/ThresholdAlerts.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/ThresholdAlerts.cpp Fri Oct 21 01:19:00 2011
@@ -28,52 +28,6 @@
namespace qpid {
namespace broker {
-namespace {
-const qmf::org::apache::qpid::broker::EventQueueThresholdExceeded EVENT("dummy", 0, 0);
-bool isQMFv2(const boost::intrusive_ptr<Message> message)
-{
- const qpid::framing::MessageProperties* props = message->getProperties<qpid::framing::MessageProperties>();
- return props && props->getAppId() == "qmf2";
-}
-
-bool isThresholdEvent(const boost::intrusive_ptr<Message> message)
-{
- if (message->getIsManagementMessage()) {
- //is this a qmf event? if so is it a threshold event?
- if (isQMFv2(message)) {
- const qpid::framing::FieldTable* headers = message->getApplicationHeaders();
- if (headers && headers->getAsString("qmf.content") == "_event") {
- //decode as list
- std::string content = message->getFrames().getContent();
- qpid::types::Variant::List list;
- qpid::amqp_0_10::ListCodec::decode(content, list);
- if (list.empty() || list.front().getType() != qpid::types::VAR_MAP) return false;
- qpid::types::Variant::Map map = list.front().asMap();
- try {
- std::string eventName = map["_schema_id"].asMap()["_class_name"].asString();
- return eventName == EVENT.getEventName();
- } catch (const std::exception& e) {
- QPID_LOG(error, "Error checking for recursive threshold alert: " << e.what());
- }
- }
- } else {
- std::string content = message->getFrames().getContent();
- qpid::framing::Buffer buffer(const_cast<char*>(content.data()), content.size());
- if (buffer.getOctet() == 'A' && buffer.getOctet() == 'M' && buffer.getOctet() == '2' && buffer.getOctet() == 'e') {
- buffer.getLong();//sequence
- std::string packageName;
- buffer.getShortString(packageName);
- if (packageName != EVENT.getPackageName()) return false;
- std::string eventName;
- buffer.getShortString(eventName);
- return eventName == EVENT.getEventName();
- }
- }
- }
- return false;
-}
-}
-
ThresholdAlerts::ThresholdAlerts(const std::string& n,
qpid::management::ManagementAgent& a,
const uint32_t ct,
@@ -90,14 +44,8 @@ void ThresholdAlerts::enqueued(const Que
if ((countThreshold && count >= countThreshold) || (sizeThreshold && size >= sizeThreshold)) {
if ((repeatInterval == 0 && lastAlert == qpid::sys::EPOCH)
|| qpid::sys::Duration(lastAlert, qpid::sys::now()) > repeatInterval) {
- //Note: Raising an event may result in messages being
- //enqueued on queues; it may even be that this event
- //causes a message to be enqueued on the queue we are
- //tracking, and so we need to avoid recursing
- if (isThresholdEvent(m.payload)) return;
- lastAlert = qpid::sys::now();
agent.raiseEvent(qmf::org::apache::qpid::broker::EventQueueThresholdExceeded(name, count, size));
- QPID_LOG(info, "Threshold event triggered for " << name << ", count=" << count << ", size=" << size);
+ lastAlert = qpid::sys::now();
}
}
}
@@ -127,12 +75,12 @@ void ThresholdAlerts::observe(Queue& que
}
void ThresholdAlerts::observe(Queue& queue, qpid::management::ManagementAgent& agent,
- const qpid::framing::FieldTable& settings, uint16_t limitRatio)
+ const qpid::framing::FieldTable& settings)
{
qpid::types::Variant::Map map;
qpid::amqp_0_10::translate(settings, map);
- observe(queue, agent, map, limitRatio);
+ observe(queue, agent, map);
}
template <class T>
@@ -170,19 +118,19 @@ class Option
};
void ThresholdAlerts::observe(Queue& queue, qpid::management::ManagementAgent& agent,
- const qpid::types::Variant::Map& settings, uint16_t limitRatio)
+ const qpid::types::Variant::Map& settings)
{
//Note: aliases are keys defined by java broker
Option<int64_t> repeatInterval("qpid.alert_repeat_gap", 60);
repeatInterval.addAlias("x-qpid-minimum-alert-repeat-gap");
- //If no explicit threshold settings were given use specified
- //percentage of any limit from the policy.
+ //If no explicit threshold settings were given use 80% of any
+ //limit from the policy.
const QueuePolicy* policy = queue.getPolicy();
- Option<uint32_t> countThreshold("qpid.alert_count", (uint32_t) (policy && limitRatio ? (policy->getMaxCount()*limitRatio/100) : 0));
+ Option<uint32_t> countThreshold("qpid.alert_count", (uint32_t) (policy ? policy->getMaxCount()*0.8 : 0));
countThreshold.addAlias("x-qpid-maximum-message-count");
- Option<uint64_t> sizeThreshold("qpid.alert_size", (uint64_t) (policy && limitRatio ? (policy->getMaxSize()*limitRatio/100) : 0));
+ Option<uint64_t> sizeThreshold("qpid.alert_size", (uint64_t) (policy ? policy->getMaxSize()*0.8 : 0));
sizeThreshold.addAlias("x-qpid-maximum-message-size");
observe(queue, agent, countThreshold.get(settings), sizeThreshold.get(settings), repeatInterval.get(settings));
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/ThresholdAlerts.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/ThresholdAlerts.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/ThresholdAlerts.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/ThresholdAlerts.h Fri Oct 21 01:19:00 2011
@@ -50,17 +50,14 @@ class ThresholdAlerts : public QueueObse
const long repeatInterval);
void enqueued(const QueuedMessage&);
void dequeued(const QueuedMessage&);
- void acquired(const QueuedMessage&) {};
- void requeued(const QueuedMessage&) {};
-
static void observe(Queue& queue, qpid::management::ManagementAgent& agent,
const uint64_t countThreshold,
const uint64_t sizeThreshold,
const long repeatInterval);
static void observe(Queue& queue, qpid::management::ManagementAgent& agent,
- const qpid::framing::FieldTable& settings, uint16_t limitRatio);
+ const qpid::framing::FieldTable& settings);
static void observe(Queue& queue, qpid::management::ManagementAgent& agent,
- const qpid::types::Variant::Map& settings, uint16_t limitRatio);
+ const qpid::types::Variant::Map& settings);
private:
const std::string name;
qpid::management::ManagementAgent& agent;
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/TopicExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/TopicExchange.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/TopicExchange.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/TopicExchange.cpp Fri Oct 21 01:19:00 2011
@@ -221,7 +221,6 @@ TopicExchange::TopicExchange(const std::
bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args)
{
- ClearCache cc(&cacheLock,&bindingCache); // clear the cache on function exit.
string fedOp(args ? args->getAsString(qpidFedOp) : fedOpBind);
string fedTags(args ? args->getAsString(qpidFedTags) : "");
string fedOrigin(args ? args->getAsString(qpidFedOrigin) : "");
@@ -250,21 +249,21 @@ bool TopicExchange::bind(Queue::shared_p
if (mgmtExchange != 0) {
mgmtExchange->inc_bindingCount();
}
- QPID_LOG(debug, "Binding key [" << routingPattern << "] to queue " << queue->getName()
- << " on exchange " << getName() << " (origin=" << fedOrigin << ")");
+ QPID_LOG(debug, "Bound key [" << routingPattern << "] to queue " << queue->getName()
+ << " (origin=" << fedOrigin << ")");
}
} else if (fedOp == fedOpUnbind) {
- RWlock::ScopedWlock l(lock);
- BindingKey* bk = getQueueBinding(queue, routingPattern);
- if (bk) {
- QPID_LOG(debug, "FedOpUnbind [" << routingPattern << "] from exchange " << getName()
- << " on queue=" << queue->getName() << " origin=" << fedOrigin);
- propagate = bk->fedBinding.delOrigin(queue->getName(), fedOrigin);
- // if this was the last binding for the queue, delete the binding
- if (bk->fedBinding.countFedBindings(queue->getName()) == 0) {
- deleteBinding(queue, routingPattern, bk);
+ bool reallyUnbind = false;
+ {
+ RWlock::ScopedWlock l(lock);
+ BindingKey* bk = bindingTree.getBindingKey(routingPattern);
+ if (bk) {
+ propagate = bk->fedBinding.delOrigin(queue->getName(), fedOrigin);
+ reallyUnbind = bk->fedBinding.countFedBindings(queue->getName()) == 0;
}
}
+ if (reallyUnbind)
+ unbind(queue, routingPattern, 0);
} else if (fedOp == fedOpReorigin) {
/** gather up all the keys that need rebinding in a local vector
* while holding the lock. Then propagate once the lock is
@@ -282,38 +281,20 @@ bool TopicExchange::bind(Queue::shared_p
}
}
- cc.clearCache(); // clear the cache before we IVE route.
routeIVE();
if (propagate)
propagateFedOp(routingKey, fedTags, fedOp, fedOrigin);
return true;
}
-bool TopicExchange::unbind(Queue::shared_ptr queue, const string& constRoutingKey, const FieldTable* args)
-{
- string fedOrigin(args ? args->getAsString(qpidFedOrigin) : "");
- QPID_LOG(debug, "Unbinding key [" << constRoutingKey << "] from queue " << queue->getName()
- << " on exchange " << getName() << " origin=" << fedOrigin << ")" );
-
- ClearCache cc(&cacheLock,&bindingCache); // clear the cache on function exit.
+bool TopicExchange::unbind(Queue::shared_ptr queue, const string& constRoutingKey, const FieldTable* /*args*/){
RWlock::ScopedWlock l(lock);
string routingKey = normalize(constRoutingKey);
- BindingKey* bk = getQueueBinding(queue, routingKey);
+ BindingKey* bk = bindingTree.getBindingKey(routingKey);
if (!bk) return false;
- bool propagate = bk->fedBinding.delOrigin(queue->getName(), fedOrigin);
- deleteBinding(queue, routingKey, bk);
- if (propagate)
- propagateFedOp(routingKey, string(), fedOpUnbind, string());
- return true;
-}
-
-
-bool TopicExchange::deleteBinding(Queue::shared_ptr queue,
- const std::string& routingKey,
- BindingKey *bk)
-{
- // Note well: write lock held by caller
Binding::vector& qv(bk->bindingVector);
+ bool propagate = false;
+
Binding::vector::iterator q;
for (q = qv.begin(); q != qv.end(); q++)
if ((*q)->queue == queue)
@@ -322,55 +303,42 @@ bool TopicExchange::deleteBinding(Queue:
qv.erase(q);
assert(nBindings > 0);
nBindings--;
-
+ propagate = bk->fedBinding.delOrigin();
if(qv.empty()) {
bindingTree.removeBindingKey(routingKey);
}
if (mgmtExchange != 0) {
mgmtExchange->dec_bindingCount();
}
- QPID_LOG(debug, "Unbound key [" << routingKey << "] from queue " << queue->getName()
- << " on exchange " << getName());
+ QPID_LOG(debug, "Unbound [" << routingKey << "] from queue " << queue->getName());
+
+ if (propagate)
+ propagateFedOp(routingKey, string(), fedOpUnbind, string());
return true;
}
-/** returns a pointer to the BindingKey if the given queue is bound to this
- * exchange using the routing pattern. 0 if queue binding does not exist.
- */
-TopicExchange::BindingKey *TopicExchange::getQueueBinding(Queue::shared_ptr queue, const string& pattern)
+bool TopicExchange::isBound(Queue::shared_ptr queue, const string& pattern)
{
// Note well: lock held by caller....
BindingKey *bk = bindingTree.getBindingKey(pattern); // Exact match against binding pattern
- if (!bk) return 0;
+ if (!bk) return false;
Binding::vector& qv(bk->bindingVector);
Binding::vector::iterator q;
for (q = qv.begin(); q != qv.end(); q++)
if ((*q)->queue == queue)
break;
- return (q != qv.end()) ? bk : 0;
+ return q != qv.end();
}
void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/)
{
// Note: PERFORMANCE CRITICAL!!!
- BindingList b;
- std::map<std::string, BindingList>::iterator it;
- { // only lock the cache for read
- RWlock::ScopedRlock cl(cacheLock);
- it = bindingCache.find(routingKey);
- if (it != bindingCache.end()) {
- b = it->second;
- }
- }
+ BindingList b(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >);
PreRoute pr(msg, this);
- if (!b.get()) // no cache hit
+ BindingsFinderIter bindingsFinder(b);
{
RWlock::ScopedRlock l(lock);
- b = BindingList(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >);
- BindingsFinderIter bindingsFinder(b);
bindingTree.iterateMatch(routingKey, bindingsFinder);
- RWlock::ScopedWlock cwl(cacheLock);
- bindingCache[routingKey] = b; // update cache
}
doRoute(msg, b);
}
@@ -380,7 +348,7 @@ bool TopicExchange::isBound(Queue::share
RWlock::ScopedRlock l(lock);
if (routingKey && queue) {
string key(normalize(*routingKey));
- return getQueueBinding(queue, key) != 0;
+ return isBound(queue, key);
} else if (!routingKey && !queue) {
return nBindings > 0;
} else if (routingKey) {
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/TopicExchange.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/TopicExchange.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/TopicExchange.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/TopicExchange.h Fri Oct 21 01:19:00 2011
@@ -56,7 +56,7 @@ class TopicExchange : public virtual Exc
// | +-->d-->...
// +-->x-->y-->...
//
- class QPID_BROKER_CLASS_EXTERN BindingNode {
+ class BindingNode {
public:
typedef boost::shared_ptr<BindingNode> shared_ptr;
@@ -135,31 +135,8 @@ class TopicExchange : public virtual Exc
BindingNode bindingTree;
unsigned long nBindings;
qpid::sys::RWlock lock; // protects bindingTree and nBindings
- qpid::sys::RWlock cacheLock; // protects cache
- std::map<std::string, BindingList> bindingCache; // cache of matched routes.
- class ClearCache {
- private:
- qpid::sys::RWlock* cacheLock;
- std::map<std::string, BindingList>* bindingCache;
- bool cleared;
- public:
- ClearCache(qpid::sys::RWlock* l, std::map<std::string, BindingList>* bc): cacheLock(l),
- bindingCache(bc),cleared(false) {};
- void clearCache() {
- qpid::sys::RWlock::ScopedWlock l(*cacheLock);
- if (!cleared) {
- bindingCache->clear();
- cleared =true;
- }
- };
- ~ClearCache(){
- clearCache();
- };
- };
- BindingKey *getQueueBinding(Queue::shared_ptr queue, const std::string& pattern);
- bool deleteBinding(Queue::shared_ptr queue,
- const std::string& routingKey,
- BindingKey *bk);
+
+ bool isBound(Queue::shared_ptr queue, const std::string& pattern);
class ReOriginIter;
class BindingsFinderIter;
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/TxBuffer.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/TxBuffer.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/TxBuffer.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/TxBuffer.cpp Fri Oct 21 01:19:00 2011
@@ -76,5 +76,5 @@ bool TxBuffer::commitLocal(Transactional
}
void TxBuffer::accept(TxOpConstVisitor& v) const {
- std::for_each(ops.begin(), ops.end(), boost::bind(&TxOp::accept, _1, boost::ref(v)));
+ std::for_each(ops.begin(), ops.end(), boost::bind(&TxOp::accept, _1, boost::ref(v)));
}
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/TxPublish.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/TxPublish.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/TxPublish.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/TxPublish.cpp Fri Oct 21 01:19:00 2011
@@ -90,7 +90,14 @@ void TxPublish::deliverTo(const boost::s
void TxPublish::prepare(TransactionContext* ctxt, const boost::shared_ptr<Queue> queue)
{
- queue->enqueue(ctxt, msg);
+ if (!queue->enqueue(ctxt, msg)){
+ /**
+ * if not store then mark message for ack and deleivery once
+ * commit happens, as async IO will never set it when no store
+ * exists
+ */
+ msg->enqueueComplete();
+ }
}
TxPublish::Commit::Commit(intrusive_ptr<Message>& _msg) : msg(_msg){}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org