You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2012/08/27 17:40:45 UTC
svn commit: r1377715 [2/12] - in /qpid/branches/asyncstore: ./ bin/ cpp/
cpp/docs/api/ cpp/examples/old_api/tradedemo/ cpp/include/qmf/engine/
cpp/include/qpid/client/ cpp/src/ cpp/src/qmf/engine/ cpp/src/qpid/acl/
cpp/src/qpid/asyncStore/ cpp/src/qpid...
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Consumer.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Consumer.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Consumer.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Consumer.h Mon Aug 27 15:40:33 2012
@@ -21,21 +21,23 @@
#ifndef _Consumer_
#define _Consumer_
-#include "qpid/broker/Message.h"
-#include "qpid/broker/QueuedMessage.h"
+#include "qpid/broker/QueueCursor.h"
#include "qpid/broker/OwnershipToken.h"
+#include <boost/shared_ptr.hpp>
+#include <string>
namespace qpid {
namespace broker {
+class DeliveryRecord;
+class Message;
class Queue;
class QueueListeners;
/**
* Base class for consumers which represent a subscription to a queue.
*/
-class Consumer
-{
+class Consumer : public QueueCursor {
const bool acquires;
// inListeners allows QueueListeners to efficiently track if this
// instance is registered for notifications without having to
@@ -47,22 +49,17 @@ class Consumer
public:
typedef boost::shared_ptr<Consumer> shared_ptr;
- Consumer(const std::string& _name, bool preAcquires = true)
- : acquires(preAcquires), inListeners(false), name(_name), position(0) {}
+ Consumer(const std::string& _name, SubscriptionType type)
+ : QueueCursor(type), acquires(type == CONSUMER), inListeners(false), name(_name) {}
virtual ~Consumer(){}
bool preAcquires() const { return acquires; }
const std::string& getName() const { return name; }
- /**@return the position of the last message seen by this consumer */
- virtual framing::SequenceNumber getPosition() const { return position; }
-
- virtual void setPosition(framing::SequenceNumber pos) { position = pos; }
-
- virtual bool deliver(QueuedMessage& msg) = 0;
+ virtual bool deliver(const QueueCursor& cursor, const Message& msg) = 0;
virtual void notify() = 0;
- virtual bool filter(boost::intrusive_ptr<Message>) { return true; }
- virtual bool accept(boost::intrusive_ptr<Message>) { return true; }
+ virtual bool filter(const Message&) { return true; }
+ virtual bool accept(const Message&) { return true; }
virtual OwnershipToken* getSession() = 0;
virtual void cancel() = 0;
@@ -75,7 +72,7 @@ class Consumer
* Not to be confused with accept() above, which is asking if
* this consumer will consume/browse the message.
*/
- virtual void acknowledged(const QueuedMessage&) = 0;
+ virtual void acknowledged(const DeliveryRecord&) = 0;
/** Called if queue has been deleted, if true suppress the error message.
* Used by HA ReplicatingSubscriptions where such errors are normal.
@@ -83,7 +80,7 @@ class Consumer
virtual bool hideDeletedError() { return false; }
protected:
- framing::SequenceNumber position;
+ //framing::SequenceNumber position;
private:
friend class QueueListeners;
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Deliverable.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Deliverable.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Deliverable.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Deliverable.h Mon Aug 27 15:40:33 2012
@@ -21,17 +21,22 @@
#ifndef _Deliverable_
#define _Deliverable_
-#include "qpid/broker/Message.h"
+#include "qpid/broker/AsyncCompletion.h"
+#include "qpid/sys/IntegerTypes.h"
+#include <boost/shared_ptr.hpp>
namespace qpid {
namespace broker {
- class Deliverable{
+ class Message;
+ class Queue;
+
+ class Deliverable : public AsyncCompletion {
public:
bool delivered;
Deliverable() : delivered(false) {}
virtual Message& getMessage() = 0;
-
+
virtual void deliverTo(const boost::shared_ptr<Queue>& queue) = 0;
virtual uint64_t contentSize() { return 0; }
virtual ~Deliverable(){}
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/DeliverableMessage.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/DeliverableMessage.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/DeliverableMessage.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/DeliverableMessage.cpp Mon Aug 27 15:40:33 2012
@@ -24,22 +24,20 @@
using namespace qpid::broker;
-DeliverableMessage::DeliverableMessage(const boost::intrusive_ptr<Message>& _msg) : msg(_msg)
-{
-}
+DeliverableMessage::DeliverableMessage(const Message& _msg, TxBuffer* _txn) : msg(_msg), txn(_txn) {}
void DeliverableMessage::deliverTo(const boost::shared_ptr<Queue>& queue)
{
- queue->deliver(msg);
+ queue->deliver(msg, txn);
delivered = true;
}
Message& DeliverableMessage::getMessage()
{
- return *msg;
+ return msg;
}
-uint64_t DeliverableMessage::contentSize ()
+uint64_t DeliverableMessage::contentSize()
{
- return msg->contentSize ();
+ return msg.getContentSize();
}
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/DeliverableMessage.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/DeliverableMessage.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/DeliverableMessage.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/DeliverableMessage.h Mon Aug 27 15:40:33 2012
@@ -25,14 +25,15 @@
#include "qpid/broker/Deliverable.h"
#include "qpid/broker/Message.h"
-#include <boost/intrusive_ptr.hpp>
-
namespace qpid {
namespace broker {
- class QPID_BROKER_CLASS_EXTERN DeliverableMessage : public Deliverable{
- boost::intrusive_ptr<Message> msg;
+ class TxBuffer;
+ class QPID_BROKER_CLASS_EXTERN DeliverableMessage : public Deliverable
+ {
+ Message msg;
+ TxBuffer* txn;
public:
- QPID_BROKER_EXTERN DeliverableMessage(const boost::intrusive_ptr<Message>& msg);
+ QPID_BROKER_EXTERN DeliverableMessage(const Message& msg, TxBuffer* txn);
QPID_BROKER_EXTERN virtual void deliverTo(const boost::shared_ptr<Queue>& queue);
QPID_BROKER_EXTERN Message& getMessage();
QPID_BROKER_EXTERN uint64_t contentSize();
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/DeliveryAdapter.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/DeliveryAdapter.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/DeliveryAdapter.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/DeliveryAdapter.h Mon Aug 27 15:40:33 2012
@@ -1,53 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _DeliveryAdapter_
-#define _DeliveryAdapter_
-
-#include "qpid/broker/DeliveryId.h"
-#include "qpid/broker/Message.h"
-#include "qpid/framing/amqp_types.h"
-
-namespace qpid {
-namespace broker {
-
-class DeliveryRecord;
-
-/**
- * The intention behind this interface is to separate the generic
- * handling of some form of message delivery to clients that is
- * contained in the version independent Channel class from the
- * details required for a particular situation or
- * version. i.e. where the existing adapters allow (through
- * supporting the generated interface for a version of the
- * protocol) inputs of a channel to be adapted to the version
- * independent part, this does the same for the outputs.
- */
-class DeliveryAdapter
-{
- public:
- virtual void deliver(DeliveryRecord&, bool sync) = 0;
- virtual ~DeliveryAdapter(){}
-};
-
-}}
-
-
-#endif
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/DeliveryRecord.cpp Mon Aug 27 15:40:33 2012
@@ -24,6 +24,7 @@
#include "qpid/broker/Consumer.h"
#include "qpid/broker/Exchange.h"
#include "qpid/broker/Queue.h"
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
#include "qpid/log/Statement.h"
#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/MessageTransferBody.h"
@@ -32,77 +33,46 @@ using namespace qpid;
using namespace qpid::broker;
using std::string;
-DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg,
+DeliveryRecord::DeliveryRecord(const QueueCursor& _msg,
+ framing::SequenceNumber _msgId,
const Queue::shared_ptr& _queue,
const std::string& _tag,
const boost::shared_ptr<Consumer>& _consumer,
bool _acquired,
bool accepted,
bool _windowing,
- uint32_t _credit):
- msg(_msg),
- queue(_queue),
- tag(_tag),
- consumer(_consumer),
- acquired(_acquired),
- acceptExpected(!accepted),
- cancelled(false),
- completed(false),
- ended(accepted && acquired),
- windowing(_windowing),
- credit(msg.payload ? msg.payload->getRequiredCredit() : _credit)
+ uint32_t _credit) : msg(_msg),
+ queue(_queue),
+ tag(_tag),
+ consumer(_consumer),
+ acquired(_acquired),
+ acceptExpected(!accepted),
+ cancelled(false),
+ completed(false),
+ ended(accepted && acquired),
+ windowing(_windowing),
+ credit(_credit),
+ msgId(_msgId)
{}
bool DeliveryRecord::setEnded()
{
ended = true;
- //reset msg pointer, don't need to hold on to it anymore
- msg.payload = boost::intrusive_ptr<Message>();
QPID_LOG(debug, "DeliveryRecord::setEnded() id=" << id);
return isRedundant();
}
-void DeliveryRecord::redeliver(SemanticState* const session) {
- if (!ended) {
- if(cancelled){
- //if subscription was cancelled, requeue it (waiting for
- //final confirmation for AMQP WG on this case)
- requeue();
- }else{
- msg.payload->redeliver();//mark as redelivered
- session->deliver(*this, false);
- }
- }
-}
-
-void DeliveryRecord::deliver(framing::FrameHandler& h, DeliveryId deliveryId, uint16_t framesize)
-{
- id = deliveryId;
- if (msg.payload->getRedelivered()){
- msg.payload->setRedelivered();
- }
- msg.payload->adjustTtl();
-
- framing::AMQFrame method((framing::MessageTransferBody(framing::ProtocolVersion(), tag, acceptExpected ? 0 : 1, acquired ? 0 : 1)));
- method.setEof(false);
- h.handle(method);
- msg.payload->sendHeader(h, framesize);
- msg.payload->sendContent(*queue, h, framesize);
-}
-
-void DeliveryRecord::requeue() const
+void DeliveryRecord::requeue()
{
if (acquired && !ended) {
- msg.payload->redeliver();
- queue->requeue(msg);
+ queue->release(msg);
}
}
void DeliveryRecord::release(bool setRedelivered)
{
if (acquired && !ended) {
- if (setRedelivered) msg.payload->redeliver();
- queue->requeue(msg);
+ queue->release(msg, setRedelivered);
acquired = false;
setEnded();
} else {
@@ -110,13 +80,14 @@ void DeliveryRecord::release(bool setRed
}
}
-void DeliveryRecord::complete() {
+void DeliveryRecord::complete()
+{
completed = true;
}
bool DeliveryRecord::accept(TransactionContext* ctxt) {
if (!ended) {
- if (consumer) consumer->acknowledged(getMessage());
+ if (consumer) consumer->acknowledged(*this);
if (acquired) queue->dequeue(ctxt, msg);
setEnded();
QPID_LOG(debug, "Accepted " << id);
@@ -124,31 +95,22 @@ bool DeliveryRecord::accept(TransactionC
return isRedundant();
}
-void DeliveryRecord::dequeue(TransactionContext* ctxt) const{
+void DeliveryRecord::dequeue(TransactionContext* ctxt) const
+{
if (acquired && !ended) {
queue->dequeue(ctxt, msg);
}
}
-void DeliveryRecord::committed() const{
+void DeliveryRecord::committed() const
+{
queue->dequeueCommitted(msg);
}
void DeliveryRecord::reject()
{
if (acquired && !ended) {
- Exchange::shared_ptr alternate = queue->getAlternateExchange();
- if (alternate) {
- DeliverableMessage delivery(msg.payload);
- alternate->routeWithAlternate(delivery);
- QPID_LOG(info, "Routed rejected message from " << queue->getName() << " to "
- << alternate->getName());
- } else {
- //just drop it
- QPID_LOG(info, "Dropping rejected message from " << queue->getName());
- }
- queue->countRejected();
- dequeue();
+ queue->reject(msg);
setEnded();
}
}
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/DeliveryRecord.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/DeliveryRecord.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/DeliveryRecord.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/DeliveryRecord.h Mon Aug 27 15:40:33 2012
@@ -26,15 +26,17 @@
#include <deque>
#include <vector>
#include <ostream>
+#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/SequenceSet.h"
#include "qpid/broker/BrokerImportExport.h"
-#include "qpid/broker/QueuedMessage.h"
+#include "qpid/broker/QueueCursor.h"
#include "qpid/broker/DeliveryId.h"
#include "qpid/broker/Message.h"
namespace qpid {
namespace broker {
+class Queue;
class TransactionContext;
class SemanticState;
struct AckRange;
@@ -45,7 +47,7 @@ class Consumer;
*/
class DeliveryRecord
{
- QueuedMessage msg;
+ QueueCursor msg;
mutable boost::shared_ptr<Queue> queue;
std::string tag; // name of consumer
boost::shared_ptr<Consumer> consumer;
@@ -65,9 +67,10 @@ class DeliveryRecord
* after that).
*/
uint32_t credit;
+ framing::SequenceNumber msgId;
public:
- QPID_BROKER_EXTERN DeliveryRecord(const QueuedMessage& msg,
+ QPID_BROKER_EXTERN DeliveryRecord(const QueueCursor& msgCursor, framing::SequenceNumber msgId,
const boost::shared_ptr<Queue>& queue,
const std::string& tag,
const boost::shared_ptr<Consumer>& consumer,
@@ -80,11 +83,10 @@ class DeliveryRecord
bool coveredBy(const framing::SequenceSet* const range) const { return range->contains(id); }
void dequeue(TransactionContext* ctxt = 0) const;
- void requeue() const;
+ void requeue();
void release(bool setRedelivered);
void reject();
void cancel(const std::string& tag);
- void redeliver(SemanticState* const);
void acquire(DeliveryIds& results);
void complete();
bool accept(TransactionContext* ctxt); // Returns isRedundant()
@@ -102,13 +104,13 @@ class DeliveryRecord
uint32_t getCredit() const;
const std::string& getTag() const { return tag; }
- void deliver(framing::FrameHandler& h, DeliveryId deliveryId, uint16_t framesize);
void setId(DeliveryId _id) { id = _id; }
typedef std::deque<DeliveryRecord> DeliveryRecords;
static AckRange findRange(DeliveryRecords& records, DeliveryId first, DeliveryId last);
- const QueuedMessage& getMessage() const { return msg; }
+ const QueueCursor& getMessage() const { return msg; }
framing::SequenceNumber getId() const { return id; }
+ framing::SequenceNumber getMessageId() const { return msgId; }
boost::shared_ptr<Queue> getQueue() const { return queue; }
friend std::ostream& operator<<(std::ostream&, const DeliveryRecord&);
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/DtxAck.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/DtxAck.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/DtxAck.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/DtxAck.h Mon Aug 27 15:40:33 2012
@@ -40,7 +40,6 @@ class DtxAck : public TxOp{
virtual void commit() throw();
virtual void rollback() throw();
virtual ~DtxAck(){}
- virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); }
const DeliveryRecords& getPending() const { return pending; }
};
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Exchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Exchange.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Exchange.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Exchange.cpp Mon Aug 27 15:40:33 2012
@@ -25,6 +25,7 @@
#include "qpid/broker/ExchangeRegistry.h"
#include "qpid/broker/FedOps.h"
#include "qpid/broker/Queue.h"
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
#include "qpid/framing/MessageProperties.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/log/Statement.h"
@@ -62,10 +63,10 @@ Exchange::PreRoute::PreRoute(Deliverable
if (parent->sequence){
parent->sequenceNo++;
- msg.getMessage().insertCustomProperty(qpidMsgSequence,parent->sequenceNo);
+ msg.getMessage().addAnnotation(qpidMsgSequence,parent->sequenceNo);
}
if (parent->ive) {
- parent->lastMsg = &( msg.getMessage());
+ parent->lastMsg = msg.getMessage();
}
}
}
@@ -111,12 +112,6 @@ void Exchange::doRoute(Deliverable& msg,
int count = 0;
if (b.get()) {
- // Block the content release if the message is transient AND there is more than one binding
- if (!msg.getMessage().isPersistent() && b->size() > 1) {
- msg.getMessage().blockContentRelease();
- }
-
-
ExInfo error(getName()); // Save exception to throw at the end.
for(std::vector<Binding::shared_ptr>::const_iterator i = b->begin(); i != b->end(); i++, count++) {
try {
@@ -161,8 +156,8 @@ void Exchange::doRoute(Deliverable& msg,
}
void Exchange::routeIVE(){
- if (ive && lastMsg.get()){
- DeliverableMessage dmsg(lastMsg);
+ if (ive && lastMsg){
+ DeliverableMessage dmsg(lastMsg, 0);
route(dmsg);
}
}
@@ -400,9 +395,9 @@ bool Exchange::MatchQueue::operator()(Ex
return b->queue == queue;
}
-void Exchange::setProperties(const boost::intrusive_ptr<Message>& msg) {
- msg->setExchange(getName());
-}
+//void Exchange::setProperties(Message& msg) {
+// qpid::broker::amqp_0_10::MessageTransfer::setExchange(msg, getName());
+//}
bool Exchange::routeWithAlternate(Deliverable& msg)
{
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Exchange.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Exchange.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Exchange.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Exchange.h Mon Aug 27 15:40:33 2012
@@ -25,6 +25,7 @@
#include <boost/shared_ptr.hpp>
#include "qpid/broker/BrokerImportExport.h"
#include "qpid/broker/Deliverable.h"
+#include "qpid/broker/Message.h"
#include "qpid/broker/MessageStore.h"
#include "qpid/broker/PersistableExchange.h"
#include "qpid/framing/FieldTable.h"
@@ -74,7 +75,7 @@ protected:
mutable qpid::sys::Mutex sequenceLock;
int64_t sequenceNo;
bool ive;
- boost::intrusive_ptr<Message> lastMsg;
+ Message lastMsg;
class PreRoute{
public:
@@ -196,7 +197,7 @@ public:
virtual bool bind(boost::shared_ptr<Queue> queue, const std::string& routingKey, const qpid::framing::FieldTable* args) = 0;
virtual bool unbind(boost::shared_ptr<Queue> queue, const std::string& routingKey, const qpid::framing::FieldTable* args) = 0;
virtual bool isBound(boost::shared_ptr<Queue> queue, const std::string* const routingKey, const qpid::framing::FieldTable* const args) = 0;
- QPID_BROKER_EXTERN virtual void setProperties(const boost::intrusive_ptr<Message>&);
+ //QPID_BROKER_EXTERN virtual void setProperties(Message&);
virtual void route(Deliverable& msg) = 0;
//PersistableExchange:
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/ExpiryPolicy.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/ExpiryPolicy.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/ExpiryPolicy.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/ExpiryPolicy.cpp Mon Aug 27 15:40:33 2012
@@ -27,7 +27,7 @@ namespace broker {
ExpiryPolicy::~ExpiryPolicy() {}
-bool ExpiryPolicy::hasExpired(Message& m) {
+bool ExpiryPolicy::hasExpired(const Message& m) {
return m.getExpiration() < sys::AbsTime::now();
}
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/ExpiryPolicy.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/ExpiryPolicy.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/ExpiryPolicy.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/ExpiryPolicy.h Mon Aug 27 15:40:33 2012
@@ -42,7 +42,7 @@ class QPID_BROKER_CLASS_EXTERN ExpiryPol
{
public:
QPID_BROKER_EXTERN virtual ~ExpiryPolicy();
- QPID_BROKER_EXTERN virtual bool hasExpired(Message&);
+ QPID_BROKER_EXTERN virtual bool hasExpired(const Message&);
QPID_BROKER_EXTERN virtual qpid::sys::AbsTime getCurrentTime();
};
}} // namespace qpid::broker
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Fairshare.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Fairshare.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Fairshare.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Fairshare.cpp Mon Aug 27 15:40:33 2012
@@ -19,7 +19,8 @@
*
*/
#include "qpid/broker/Fairshare.h"
-#include "qpid/broker/QueuedMessage.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/QueueSettings.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/FieldValue.h"
#include "qpid/log/Statement.h"
@@ -83,17 +84,6 @@ bool Fairshare::setState(uint p, uint c)
return true;
}
-bool Fairshare::findFrontLevel(uint& p, PriorityLevels& messages)
-{
- const uint start = p = currentLevel();
- do {
- if (!messages[p].empty()) return true;
- } while ((p = nextLevel()) != start);
- return false;
-}
-
-
-
bool Fairshare::getState(const Messages& m, uint& priority, uint& count)
{
const Fairshare* fairshare = dynamic_cast<const Fairshare*>(&m);
@@ -106,82 +96,30 @@ bool Fairshare::setState(Messages& m, ui
return fairshare && fairshare->setState(priority, count);
}
-int getIntegerSetting(const qpid::framing::FieldTable& settings, const std::vector<std::string>& keys)
+PriorityQueue::Priority Fairshare::firstLevel()
{
- qpid::framing::FieldTable::ValuePtr v;
- std::vector<std::string>::const_iterator i = keys.begin();
- while (!v && i != keys.end()) {
- v = settings.get(*i++);
- }
-
- if (!v) {
- return 0;
- } else if (v->convertsTo<int>()) {
- return v->get<int>();
- } else if (v->convertsTo<std::string>()){
- std::string s = v->get<std::string>();
- try {
- return boost::lexical_cast<int>(s);
- } catch(const boost::bad_lexical_cast&) {
- QPID_LOG(warning, "Ignoring invalid integer value for " << *i << ": " << s);
- return 0;
- }
- } else {
- QPID_LOG(warning, "Ignoring invalid integer value for " << *i << ": " << *v);
- return 0;
- }
+ return Priority(currentLevel());
}
-int getIntegerSettingForKey(const qpid::framing::FieldTable& settings, const std::string& key)
+bool Fairshare::nextLevel(Priority& p)
{
- return getIntegerSetting(settings, boost::assign::list_of<std::string>(key));
-}
-
-int getSetting(const qpid::framing::FieldTable& settings, const std::vector<std::string>& keys, int minvalue, int maxvalue)
-{
- return std::max(minvalue,std::min(getIntegerSetting(settings, keys), maxvalue));
-}
-
-std::auto_ptr<Fairshare> getFairshareForKey(const qpid::framing::FieldTable& settings, uint levels, const std::string& key)
-{
- uint defaultLimit = getIntegerSettingForKey(settings, key);
- std::auto_ptr<Fairshare> fairshare(new Fairshare(levels, defaultLimit));
- for (uint i = 0; i < levels; i++) {
- std::string levelKey = (boost::format("%1%-%2%") % key % i).str();
- if(settings.isSet(levelKey)) {
- fairshare->setLimit(i, getIntegerSettingForKey(settings, levelKey));
- }
- }
- if (!fairshare->isNull()) {
- return fairshare;
+ int next = nextLevel();
+ if (next == p.start) {
+ return false;
} else {
- return std::auto_ptr<Fairshare>();
- }
-}
-
-std::auto_ptr<Fairshare> getFairshare(const qpid::framing::FieldTable& settings,
- uint levels,
- const std::vector<std::string>& keys)
-{
- std::auto_ptr<Fairshare> fairshare;
- for (std::vector<std::string>::const_iterator i = keys.begin(); i != keys.end() && !fairshare.get(); ++i) {
- fairshare = getFairshareForKey(settings, levels, *i);
+ p.current = next;
+ return true;
}
- return fairshare;
}
-std::auto_ptr<Messages> Fairshare::create(const qpid::framing::FieldTable& settings)
+std::auto_ptr<Messages> Fairshare::create(const QueueSettings& settings)
{
- using boost::assign::list_of;
- std::auto_ptr<Messages> result;
- size_t levels = getSetting(settings, list_of<std::string>("qpid.priorities")("x-qpid-priorities"), 0, 100);
- if (levels) {
- std::auto_ptr<Fairshare> fairshare =
- getFairshare(settings, levels, list_of<std::string>("qpid.fairshare")("x-qpid-fairshare"));
- if (fairshare.get()) result = fairshare;
- else result = std::auto_ptr<Messages>(new PriorityQueue(levels));
+ std::auto_ptr<Fairshare> fairshare(new Fairshare(settings.priorities, settings.defaultFairshare));
+ for (uint i = 0; i < settings.priorities; i++) {
+ std::map<uint32_t,uint32_t>::const_iterator l = settings.fairshare.find(i);
+ if (l != settings.fairshare.end()) fairshare->setLimit(i, l->second);
}
- return result;
+ return std::auto_ptr<Messages>(fairshare.release());
}
}} // namespace qpid::broker
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Fairshare.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Fairshare.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Fairshare.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Fairshare.h Mon Aug 27 15:40:33 2012
@@ -24,13 +24,11 @@
#include "qpid/broker/PriorityQueue.h"
namespace qpid {
-namespace framing {
-class FieldTable;
-}
namespace broker {
+struct QueueSettings;
/**
- * Modifies a basic prioirty queue by limiting the number of messages
+ * Modifies a basic priority queue by limiting the number of messages
* from each priority level that are dispatched before allowing
* dispatch from the next level.
*/
@@ -42,7 +40,7 @@ class Fairshare : public PriorityQueue
bool setState(uint priority, uint count);
void setLimit(size_t level, uint limit);
bool isNull();
- static std::auto_ptr<Messages> create(const qpid::framing::FieldTable& settings);
+ static std::auto_ptr<Messages> create(const QueueSettings& settings);
static bool getState(const Messages&, uint& priority, uint& count);
static bool setState(Messages&, uint priority, uint count);
private:
@@ -54,7 +52,8 @@ class Fairshare : public PriorityQueue
uint currentLevel();
uint nextLevel();
bool limitReached();
- bool findFrontLevel(uint& p, PriorityLevels&);
+ Priority firstLevel();
+ bool nextLevel(Priority& );
};
}} // namespace qpid::broker
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/FifoDistributor.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/FifoDistributor.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/FifoDistributor.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/FifoDistributor.cpp Mon Aug 27 15:40:33 2012
@@ -28,21 +28,14 @@ using namespace qpid::broker;
FifoDistributor::FifoDistributor(Messages& container)
: messages(container) {}
-bool FifoDistributor::nextConsumableMessage( Consumer::shared_ptr&, QueuedMessage& next )
+bool FifoDistributor::acquire(const std::string&, Message& msg)
{
- return messages.consume(next);
-}
-
-bool FifoDistributor::allocate(const std::string&, const QueuedMessage& )
-{
- // by default, all messages present on the queue may be allocated as they have yet to
- // be acquired.
- return true;
-}
-
-bool FifoDistributor::nextBrowsableMessage( Consumer::shared_ptr& c, QueuedMessage& next )
-{
- return messages.browse(c->getPosition(), next, !c->browseAcquired());
+ if (msg.getState() == AVAILABLE) {
+ msg.setState(ACQUIRED);
+ return true;
+ } else {
+ return false;
+ }
}
void FifoDistributor::query(qpid::types::Variant::Map&) const
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/FifoDistributor.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/FifoDistributor.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/FifoDistributor.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/FifoDistributor.h Mon Aug 27 15:40:33 2012
@@ -38,15 +38,7 @@ class FifoDistributor : public MessageDi
public:
FifoDistributor(Messages& container);
- /** Locking Note: all methods assume the caller is holding the Queue::messageLock
- * during the method call.
- */
-
- /** MessageDistributor interface */
-
- bool nextConsumableMessage( Consumer::shared_ptr& consumer, QueuedMessage& next );
- bool allocate(const std::string& consumer, const QueuedMessage& target);
- bool nextBrowsableMessage( Consumer::shared_ptr& consumer, QueuedMessage& next );
+ bool acquire(const std::string& consumer, Message& target);
void query(qpid::types::Variant::Map&) const;
private:
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/HeadersExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/HeadersExchange.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/HeadersExchange.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/HeadersExchange.cpp Mon Aug 27 15:40:33 2012
@@ -19,6 +19,7 @@
*
*/
#include "qpid/broker/HeadersExchange.h"
+#include "qpid/broker/MapHandler.h"
#include "qpid/framing/FieldValue.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/log/Statement.h"
@@ -55,6 +56,100 @@ namespace {
const std::string fedOpUnbind("U");
const std::string fedOpReorigin("R");
const std::string fedOpHello("H");
+
+std::string getMatch(const FieldTable* args)
+{
+ if (!args) {
+ throw InternalErrorException(QPID_MSG("No arguments given."));
+ }
+ FieldTable::ValuePtr what = args->get(x_match);
+ if (!what) {
+ return empty;
+ }
+ if (!what->convertsTo<std::string>()) {
+ throw InternalErrorException(QPID_MSG("Invalid x-match binding format to headers exchange. Must be a string [\"all\" or \"any\"]"));
+ }
+ return what->get<std::string>();
+}
+class Matcher : public MapHandler
+{
+ public:
+ Matcher(const FieldTable& b) : binding(b), matched(0) {}
+ void handleUint8(const MapHandler::CharSequence& key, uint8_t value) { processUint(std::string(key.data, key.size), value); }
+ void handleUint16(const MapHandler::CharSequence& key, uint16_t value) { processUint(std::string(key.data, key.size), value); }
+ void handleUint32(const MapHandler::CharSequence& key, uint32_t value) { processUint(std::string(key.data, key.size), value); }
+ void handleUint64(const MapHandler::CharSequence& key, uint64_t value) { processUint(std::string(key.data, key.size), value); }
+ void handleInt8(const MapHandler::CharSequence& key, int8_t value) { processInt(std::string(key.data, key.size), value); }
+ void handleInt16(const MapHandler::CharSequence& key, int16_t value) { processInt(std::string(key.data, key.size), value); }
+ void handleInt32(const MapHandler::CharSequence& key, int32_t value) { processInt(std::string(key.data, key.size), value); }
+ void handleInt64(const MapHandler::CharSequence& key, int64_t value) { processInt(std::string(key.data, key.size), value); }
+ void handleFloat(const MapHandler::CharSequence& key, float value) { processFloat(std::string(key.data, key.size), value); }
+ void handleDouble(const MapHandler::CharSequence& key, double value) { processFloat(std::string(key.data, key.size), value); }
+ void handleString(const MapHandler::CharSequence& key, const MapHandler::CharSequence& value, const MapHandler::CharSequence& /*encoding*/)
+ {
+ processString(std::string(key.data, key.size), std::string(value.data, value.size));
+ }
+ void handleVoid(const MapHandler::CharSequence& key)
+ {
+ valueCheckRequired(std::string(key.data, key.size));
+ }
+ bool matches()
+ {
+ std::string what = getMatch(&binding);
+ if (what == all) {
+ //must match all entries in the binding, except the match mode indicator
+ return matched == binding.size() - 1;
+ } else if (what == any) {
+ //match any of the entries in the binding
+ return matched > 0;
+ } else {
+ return false;
+ }
+ }
+ private:
+ bool valueCheckRequired(const std::string& key)
+ {
+ FieldTable::ValuePtr v = binding.get(key);
+ if (v) {
+ if (v->getType() == 0xf0/*VOID*/) {
+ ++matched;
+ return false;
+ } else {
+ return true;
+ }
+ } else {
+ return false;
+ }
+ }
+
+ void processString(const std::string& key, const std::string& actual)
+ {
+ if (valueCheckRequired(key) && binding.getAsString(key) == actual) {
+ ++matched;
+ }
+ }
+ void processFloat(const std::string& key, double actual)
+ {
+ double bound;
+ if (valueCheckRequired(key) && binding.getDouble(key, bound) && bound == actual) {
+ ++matched;
+ }
+ }
+ void processInt(const std::string& key, int64_t actual)
+ {
+ if (valueCheckRequired(key) && binding.getAsInt64(key) == actual) {
+ ++matched;
+ }
+ }
+ void processUint(const std::string& key, uint64_t actual)
+ {
+ if (valueCheckRequired(key) && binding.getAsUInt64(key) == actual) {
+ ++matched;
+ }
+ }
+ const FieldTable& binding;
+ size_t matched;
+};
}
HeadersExchange::HeadersExchange(const string& _name, Manageable* _parent, Broker* b) :
@@ -72,21 +167,6 @@ HeadersExchange::HeadersExchange(const s
mgmtExchange->set_type (typeName);
}
-std::string HeadersExchange::getMatch(const FieldTable* args)
-{
- if (!args) {
- throw InternalErrorException(QPID_MSG("No arguments given."));
- }
- FieldTable::ValuePtr what = args->get(x_match);
- if (!what) {
- return empty;
- }
- if (!what->convertsTo<std::string>()) {
- throw InternalErrorException(QPID_MSG("Invalid x-match binding format to headers exchange. Must be a string [\"all\" or \"any\"]"));
- }
- return what->get<std::string>();
-}
-
bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, const FieldTable* args)
{
string fedOp(fedOpBind);
@@ -196,28 +276,16 @@ bool HeadersExchange::unbind(Queue::shar
void HeadersExchange::route(Deliverable& msg)
{
- const FieldTable* args = msg.getMessage().getApplicationHeaders();
- if (!args) {
- //can't match if there were no headers passed in
- if (mgmtExchange != 0) {
- mgmtExchange->inc_msgReceives();
- mgmtExchange->inc_byteReceives(msg.contentSize());
- mgmtExchange->inc_msgDrops();
- mgmtExchange->inc_byteDrops(msg.contentSize());
- if (brokerMgmtObject)
- brokerMgmtObject->inc_discardsNoRoute();
- }
- return;
- }
-
PreRoute pr(msg, this);
BindingList b(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >);
Bindings::ConstPtr p = bindings.snapshot();
if (p.get()) {
for (std::vector<BoundKey>::const_iterator i = p->begin(); i != p->end(); ++i) {
- if (match((*i).binding->args, *args)) {
- b->push_back((*i).binding);
+ Matcher matcher(i->binding->args);
+ msg.getMessage().processProperties(matcher);
+ if (matcher.matches()) {
+ b->push_back(i->binding);
}
}
}
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/HeadersExchange.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/HeadersExchange.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/HeadersExchange.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/HeadersExchange.h Mon Aug 27 15:40:33 2012
@@ -73,9 +73,6 @@ class HeadersExchange : public virtual E
Bindings bindings;
qpid::sys::Mutex lock;
-
- static std::string getMatch(const framing::FieldTable* args);
-
protected:
void getNonFedArgs(const framing::FieldTable* args,
framing::FieldTable& nonFedArgs);
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/LegacyLVQ.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/LegacyLVQ.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/LegacyLVQ.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/LegacyLVQ.cpp Mon Aug 27 15:40:33 2012
@@ -1,127 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "qpid/broker/LegacyLVQ.h"
-#include "qpid/broker/Broker.h"
-#include "qpid/broker/QueuedMessage.h"
-
-namespace qpid {
-namespace broker {
-
-LegacyLVQ::LegacyLVQ(const std::string& k, bool b, Broker* br) : MessageMap(k), noBrowse(b), broker(br) {}
-
-void LegacyLVQ::setNoBrowse(bool b)
-{
- noBrowse = b;
-}
-bool LegacyLVQ::deleted(const QueuedMessage& message)
-{
- Ordering::iterator i = messages.find(message.position);
- if (i != messages.end() && i->second.payload == message.payload) {
- erase(i);
- return true;
- } else {
- return false;
- }
-}
-
-bool LegacyLVQ::acquire(const framing::SequenceNumber& position, QueuedMessage& message)
-{
- Ordering::iterator i = messages.find(position);
- if (i != messages.end() && i->second.payload == message.payload && i->second.status == QueuedMessage::AVAILABLE) {
- i->second.status = QueuedMessage::ACQUIRED;
- message = i->second;
- return true;
- } else {
- return false;
- }
-}
-
-bool LegacyLVQ::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool unacquired)
-{
- if (MessageMap::browse(position, message, unacquired)) {
- if (!noBrowse) index.erase(getKey(message));
- return true;
- } else {
- return false;
- }
-}
-
-bool LegacyLVQ::push(const QueuedMessage& added, QueuedMessage& removed)
-{
- //Hack to disable LVQ behaviour on cluster update:
- if (broker && broker->isClusterUpdatee()) {
- messages[added.position] = added;
- return false;
- } else {
- return MessageMap::push(added, removed);
- }
-}
-
-const QueuedMessage& LegacyLVQ::replace(const QueuedMessage& original, const QueuedMessage& update)
-{
- //add the new message into the original position of the replaced message
- Ordering::iterator i = messages.find(original.position);
- if (i != messages.end()) {
- i->second = update;
- i->second.position = original.position;
- return i->second;
- } else {
- QPID_LOG(error, "Failed to replace message at " << original.position);
- return update;
- }
-}
-
-void LegacyLVQ::removeIf(Predicate p)
-{
- //Note: This method is currently called periodically on the timer
- //thread to expire messages. In a clustered broker this means that
- //the purging does not occur on the cluster event dispatch thread
- //and consequently that is not totally ordered w.r.t other events
- //(including publication of messages). The cluster does ensure
- //that the actual expiration of messages (as distinct from the
- //removing of those expired messages from the queue) *is*
- //consistently ordered w.r.t. cluster events. This means that
- //delivery of messages is in general consistent across the cluster
- //inspite of any non-determinism in the triggering of a
- //purge. However at present purging a last value queue (of the
- //legacy sort) could potentially cause inconsistencies in the
- //cluster (as the order w.r.t publications can affect the order in
- //which messages appear in the queue). Consequently periodic
- //purging of an LVQ is not enabled if the broker is clustered
- //(expired messages will be removed on delivery and consolidated
- //by key as part of normal LVQ operation).
- if (!broker || !broker->isInCluster())
- MessageMap::removeIf(p);
-}
-
-std::auto_ptr<Messages> LegacyLVQ::updateOrReplace(std::auto_ptr<Messages> current,
- const std::string& key, bool noBrowse, Broker* broker)
-{
- LegacyLVQ* lvq = dynamic_cast<LegacyLVQ*>(current.get());
- if (lvq) {
- lvq->setNoBrowse(noBrowse);
- return current;
- } else {
- return std::auto_ptr<Messages>(new LegacyLVQ(key, noBrowse, broker));
- }
-}
-
-}} // namespace qpid::broker
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/LegacyLVQ.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/LegacyLVQ.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/LegacyLVQ.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/LegacyLVQ.h Mon Aug 27 15:40:33 2012
@@ -1,60 +0,0 @@
-#ifndef QPID_BROKER_LEGACYLVQ_H
-#define QPID_BROKER_LEGACYLVQ_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "qpid/broker/MessageMap.h"
-#include <memory>
-
-namespace qpid {
-namespace broker {
-class Broker;
-
-/**
- * This class encapsulates the behaviour of the old style LVQ where a
- * message replacing another messages for the given key will use the
- * position in the queue of the previous message. This however causes
- * problems for browsing. Either browsers stop the coalescing of
- * messages by key (default) or they may mis updates (if the no-browse
- * option is specified).
- */
-class LegacyLVQ : public MessageMap
-{
- public:
- LegacyLVQ(const std::string& key, bool noBrowse = false, Broker* broker = 0);
- bool deleted(const QueuedMessage&);
- bool acquire(const framing::SequenceNumber&, QueuedMessage&);
- bool browse(const framing::SequenceNumber&, QueuedMessage&, bool);
- bool push(const QueuedMessage& added, QueuedMessage& removed);
- void removeIf(Predicate);
- void setNoBrowse(bool);
- static std::auto_ptr<Messages> updateOrReplace(std::auto_ptr<Messages> current,
- const std::string& key, bool noBrowse,
- Broker* broker);
- protected:
- bool noBrowse;
- Broker* broker;
-
- const QueuedMessage& replace(const QueuedMessage&, const QueuedMessage&);
-};
-}} // namespace qpid::broker
-
-#endif /*!QPID_BROKER_LEGACYLVQ_H*/
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Link.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Link.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Link.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Link.cpp Mon Aug 27 15:40:33 2012
@@ -92,10 +92,10 @@ public:
// Process messages sent from the remote's amq.failover exchange by extracting the failover URLs
// and saving them should the Link need to reconnect.
- void route(broker::Deliverable& msg)
+ void route(broker::Deliverable& /*msg*/)
{
if (!link) return;
- const framing::FieldTable* headers = msg.getMessage().getApplicationHeaders();
+ const framing::FieldTable* headers = 0;//TODO: msg.getMessage().getApplicationHeaders();
framing::Array addresses;
if (headers && headers->getArray(FAILOVER_HEADER_KEY, addresses)) {
// convert the Array of addresses to a single Url container for used with setUrl():
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/LinkRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/LinkRegistry.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/LinkRegistry.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/LinkRegistry.cpp Mon Aug 27 15:40:33 2012
@@ -119,6 +119,7 @@ pair<Link::shared_ptr, bool> LinkRegistr
parent, failover));
if (durable && store) store->create(*link);
links[name] = link;
+ pendingLinks[name] = link;
QPID_LOG(debug, "Creating new link; name=" << name );
return std::pair<Link::shared_ptr, bool>(link, true);
}
@@ -229,6 +230,7 @@ void LinkRegistry::linkDestroyed(Link *l
QPID_LOG(debug, "LinkRegistry::destroy(); link= " << link->getName());
Mutex::ScopedLock locker(lock);
+ pendingLinks.erase(link->getName());
LinkMap::iterator i = links.find(link->getName());
if (i != links.end())
{
@@ -322,10 +324,12 @@ void LinkRegistry::notifyConnection(cons
Link::shared_ptr link;
{
Mutex::ScopedLock locker(lock);
- for (LinkMap::iterator l = links.begin(); l != links.end(); ++l) {
+ for (LinkMap::iterator l = pendingLinks.begin(); l != pendingLinks.end(); ++l) {
if (l->second->pendingConnection(host, port)) {
link = l->second;
+ pendingLinks.erase(l);
connections[key] = link->getName();
+ QPID_LOG(debug, "LinkRegistry:: found pending =" << link->getName());
break;
}
}
@@ -347,6 +351,10 @@ void LinkRegistry::notifyClosed(const st
{
Link::shared_ptr link = findLink(key);
if (link) {
+ {
+ Mutex::ScopedLock locker(lock);
+ pendingLinks[link->getName()] = link;
+ }
link->closed(0, "Closed by peer");
}
}
@@ -355,6 +363,10 @@ void LinkRegistry::notifyConnectionForce
{
Link::shared_ptr link = findLink(key);
if (link) {
+ {
+ Mutex::ScopedLock locker(lock);
+ pendingLinks[link->getName()] = link;
+ }
link->notifyConnectionForced(text);
}
}
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/LinkRegistry.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/LinkRegistry.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/LinkRegistry.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/LinkRegistry.h Mon Aug 27 15:40:33 2012
@@ -47,6 +47,7 @@ namespace broker {
LinkMap links; /** indexed by name of Link */
BridgeMap bridges; /** indexed by name of Bridge */
ConnectionMap connections; /** indexed by connection identifier, gives link name */
+ LinkMap pendingLinks; /** pending connection, indexed by name of Link */
qpid::sys::Mutex lock;
Broker* broker;
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Message.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Message.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Message.cpp Mon Aug 27 15:40:33 2012
@@ -20,19 +20,12 @@
*/
#include "qpid/broker/Message.h"
-#include "qpid/broker/Queue.h"
-#include "qpid/broker/ExchangeRegistry.h"
-#include "qpid/broker/ExpiryPolicy.h"
+#include "qpid/broker/MapHandler.h"
#include "qpid/StringUtils.h"
-#include "qpid/framing/frame_functors.h"
-#include "qpid/framing/FieldTable.h"
-#include "qpid/framing/MessageTransferBody.h"
-#include "qpid/framing/SendContent.h"
-#include "qpid/framing/SequenceNumber.h"
-#include "qpid/framing/TypeFilter.h"
-#include "qpid/framing/reply_exceptions.h"
#include "qpid/log/Statement.h"
+#include <algorithm>
+#include <string.h>
#include <time.h>
using boost::intrusive_ptr;
@@ -41,492 +34,261 @@ using qpid::sys::Duration;
using qpid::sys::TIME_MSEC;
using qpid::sys::FAR_FUTURE;
using std::string;
-using namespace qpid::framing;
namespace qpid {
namespace broker {
-TransferAdapter Message::TRANSFER;
-
-Message::Message(const framing::SequenceNumber& id) :
- frames(id), persistenceId(0), redelivered(false), loaded(false),
- staged(false), forcePersistentPolicy(false), publisher(0), adapter(0),
- expiration(FAR_FUTURE), dequeueCallback(0),
- inCallback(false), requiredCredit(0), isManagementMessage(false), copyHeaderOnWrite(false)
-{}
-
-Message::~Message() {}
-
-void Message::forcePersistent()
+Message::Message() : deliveryCount(0), publisher(0), expiration(FAR_FUTURE), timestamp(0), isManagementMessage(false) {}
+Message::Message(boost::intrusive_ptr<Encoding> e, boost::intrusive_ptr<PersistableMessage> p)
+ : encoding(e), persistentContext(p), deliveryCount(0), publisher(0), expiration(FAR_FUTURE), timestamp(0), isManagementMessage(false)
{
- sys::Mutex::ScopedLock l(lock);
- // only set forced bit if we actually need to force.
- if (! getAdapter().isPersistent(frames) ){
- forcePersistentPolicy = true;
- }
+ if (persistentContext) persistentContext->setIngressCompletion(e);
}
+Message::~Message() {}
-bool Message::isForcedPersistent()
-{
- return forcePersistentPolicy;
-}
std::string Message::getRoutingKey() const
{
- return getAdapter().getRoutingKey(frames);
-}
-
-std::string Message::getExchangeName() const
-{
- return getAdapter().getExchange(frames);
-}
-
-const boost::shared_ptr<Exchange> Message::getExchange(ExchangeRegistry& registry) const
-{
- if (!exchange) {
- exchange = registry.get(getExchangeName());
- }
- return exchange;
-}
-
-bool Message::isImmediate() const
-{
- return getAdapter().isImmediate(frames);
-}
-
-const FieldTable* Message::getApplicationHeaders() const
-{
- sys::Mutex::ScopedLock l(lock);
- return getAdapter().getApplicationHeaders(frames);
-}
-
-std::string Message::getAppId() const
-{
- sys::Mutex::ScopedLock l(lock);
- return getAdapter().getAppId(frames);
+ return getEncoding().getRoutingKey();
}
bool Message::isPersistent() const
{
- sys::Mutex::ScopedLock l(lock);
- return (getAdapter().isPersistent(frames) || forcePersistentPolicy);
+ return getEncoding().isPersistent();
}
-bool Message::requiresAccept()
+uint64_t Message::getContentSize() const
{
- return getAdapter().requiresAccept(frames);
+ return getEncoding().getContentSize();
}
-uint32_t Message::getRequiredCredit()
+boost::intrusive_ptr<AsyncCompletion> Message::getIngressCompletion() const
{
- sys::Mutex::ScopedLock l(lock);
- if (!requiredCredit) {
- //add up payload for all header and content frames in the frameset
- SumBodySize sum;
- frames.map_if(sum, TypeFilter2<HEADER_BODY, CONTENT_BODY>());
- requiredCredit = sum.getSize();
- }
- return requiredCredit;
+ return encoding;
}
-void Message::encode(framing::Buffer& buffer) const
+namespace
{
- sys::Mutex::ScopedLock l(lock);
- //encode method and header frames
- EncodeFrame f1(buffer);
- frames.map_if(f1, TypeFilter2<METHOD_BODY, HEADER_BODY>());
-
- //then encode the payload of each content frame
- framing::EncodeBody f2(buffer);
- frames.map_if(f2, TypeFilter<CONTENT_BODY>());
+const std::string X_QPID_TRACE("x-qpid.trace");
}
-void Message::encodeContent(framing::Buffer& buffer) const
+bool Message::isExcluded(const std::vector<std::string>& excludes) const
{
- sys::Mutex::ScopedLock l(lock);
- //encode the payload of each content frame
- EncodeBody f2(buffer);
- frames.map_if(f2, TypeFilter<CONTENT_BODY>());
+ std::string traceStr = getEncoding().getAnnotationAsString(X_QPID_TRACE);
+ if (traceStr.size()) {
+ std::vector<std::string> trace = split(traceStr, ", ");
+ for (std::vector<std::string>::const_iterator i = excludes.begin(); i != excludes.end(); i++) {
+ for (std::vector<std::string>::const_iterator j = trace.begin(); j != trace.end(); j++) {
+ if (*i == *j) {
+ return true;
+ }
+ }
+ }
+ }
+ return false;
}
-uint32_t Message::encodedSize() const
+void Message::addTraceId(const std::string& id)
{
- return encodedHeaderSize() + encodedContentSize();
+ std::string trace = getEncoding().getAnnotationAsString(X_QPID_TRACE);
+ if (trace.empty()) {
+ annotations[X_QPID_TRACE] = id;
+ } else if (trace.find(id) == std::string::npos) {
+ trace += ",";
+ trace += id;
+ annotations[X_QPID_TRACE] = trace;
+ }
+ annotationsChanged();
}
-uint32_t Message::encodedContentSize() const
+void Message::clearTrace()
{
- sys::Mutex::ScopedLock l(lock);
- return frames.getContentSize();
+ annotations[X_QPID_TRACE] = std::string();
+ annotationsChanged();
}
-uint32_t Message::encodedHeaderSize() const
+void Message::setTimestamp()
{
- sys::Mutex::ScopedLock l(lock); // prevent modifications while computing size
- //add up the size for all method and header frames in the frameset
- SumFrameSize sum;
- frames.map_if(sum, TypeFilter2<METHOD_BODY, HEADER_BODY>());
- return sum.getSize();
+ timestamp = ::time(0); // AMQP-0.10: posix time_t - secs since Epoch
}
-void Message::decodeHeader(framing::Buffer& buffer)
+uint64_t Message::getTimestamp() const
{
- AMQFrame method;
- method.decode(buffer);
- frames.append(method);
-
- AMQFrame header;
- header.decode(buffer);
- frames.append(header);
+ return timestamp;
}
-void Message::decodeContent(framing::Buffer& buffer)
+uint64_t Message::getTtl() const
{
- if (buffer.available()) {
- //get the data as a string and set that as the content
- //body on a frame then add that frame to the frameset
- AMQFrame frame((AMQContentBody()));
- frame.castBody<AMQContentBody>()->decode(buffer, buffer.available());
- frame.setFirstSegment(false);
- frames.append(frame);
+ uint64_t ttl;
+ if (encoding->getTtl(ttl) && expiration < FAR_FUTURE) {
+ sys::AbsTime current(
+ expiryPolicy ? expiryPolicy->getCurrentTime() : sys::AbsTime::now());
+ sys::Duration ttl(current, getExpiration());
+ // convert from ns to ms; set to 1 if expired
+ return (int64_t(ttl) >= 1000000 ? int64_t(ttl)/1000000 : 1);
} else {
- //adjust header flags
- MarkLastSegment f;
- frames.map_if(f, TypeFilter<HEADER_BODY>());
- }
- //mark content loaded
- loaded = true;
-}
-
-// Used for testing only
-void Message::tryReleaseContent()
-{
- if (checkContentReleasable()) {
- releaseContent();
- }
-}
-
-void Message::releaseContent(MessageStore* s)
-{
- //deprecated, use setStore(store); releaseContent(); instead
- if (!store) setStore(s);
- releaseContent();
-}
-
-void Message::releaseContent()
-{
- sys::Mutex::ScopedLock l(lock);
- if (store) {
- if (!getPersistenceId()) {
- intrusive_ptr<PersistableMessage> pmsg(this);
- store->stage(pmsg);
- staged = true;
- }
- //ensure required credit and size is cached before content frames are released
- getRequiredCredit();
- contentSize();
- //remove any content frames from the frameset
- frames.remove(TypeFilter<CONTENT_BODY>());
- setContentReleased();
+ return 0;
}
}
-void Message::destroy()
+void Message::computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& e)
{
- if (staged) {
- if (store) {
- store->destroy(*this);
- } else {
- QPID_LOG(error, "Message content was staged but no store is set so it can't be destroyed");
+ //TODO: this is still quite 0-10 specific...
+ uint64_t ttl;
+ if (getEncoding().getTtl(ttl)) {
+ if (e) {
+ // Use higher resolution time for the internal expiry calculation.
+ // Prevent overflow as a signed int64_t
+ Duration duration(std::min(ttl * TIME_MSEC,
+ (uint64_t) std::numeric_limits<int64_t>::max()));
+ expiration = AbsTime(e->getCurrentTime(), duration);
+ setExpiryPolicy(e);
}
}
}
-bool Message::getContentFrame(const Queue& queue, AMQFrame& frame, uint16_t maxContentSize, uint64_t offset) const
+void Message::addAnnotation(const std::string& key, const qpid::types::Variant& value)
{
- intrusive_ptr<const PersistableMessage> pmsg(this);
-
- bool done = false;
- string& data = frame.castBody<AMQContentBody>()->getData();
- store->loadContent(queue, pmsg, data, offset, maxContentSize);
- done = data.size() < maxContentSize;
- frame.setBof(false);
- frame.setEof(true);
- QPID_LOG(debug, "loaded frame" << frame);
- if (offset > 0) {
- frame.setBos(false);
- }
- if (!done) {
- frame.setEos(false);
- } else return false;
- return true;
-}
-
-void Message::sendContent(const Queue& queue, framing::FrameHandler& out, uint16_t maxFrameSize) const
-{
- sys::Mutex::ScopedLock l(lock);
- if (isContentReleased() && !frames.isComplete()) {
- sys::Mutex::ScopedUnlock u(lock);
- uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
- bool morecontent = true;
- for (uint64_t offset = 0; morecontent; offset += maxContentSize)
- {
- AMQFrame frame((AMQContentBody()));
- morecontent = getContentFrame(queue, frame, maxContentSize, offset);
- out.handle(frame);
- }
- queue.countLoadedFromDisk(contentSize());
- } else {
- Count c;
- frames.map_if(c, TypeFilter<CONTENT_BODY>());
-
- SendContent f(out, maxFrameSize, c.getCount());
- frames.map_if(f, TypeFilter<CONTENT_BODY>());
- }
+ annotations[key] = value;
+ annotationsChanged();
}
-void Message::sendHeader(framing::FrameHandler& out, uint16_t /*maxFrameSize*/) const
+void Message::annotationsChanged()
{
- sys::Mutex::ScopedLock l(lock);
- Relay f(out);
- frames.map_if(f, TypeFilter<HEADER_BODY>());
- //as frame (and pointer to body) has now been passed to handler,
- //subsequent modifications should use a copy
- copyHeaderOnWrite = true;
-}
-
-// TODO aconway 2007-11-09: Obsolete, remove. Was used to cover over
-// 0-8/0-9 message differences.
-MessageAdapter& Message::getAdapter() const
-{
- if (!adapter) {
- if(frames.isA<MessageTransferBody>()) {
- adapter = &TRANSFER;
- } else {
- const AMQMethodBody* method = frames.getMethod();
- if (!method) throw Exception("Can't adapt message with no method");
- else throw Exception(QPID_MSG("Can't adapt message based on " << *method));
- }
+ if (persistentContext) {
+ persistentContext = persistentContext->merge(annotations);
+ persistentContext->setIngressCompletion(encoding);
}
- return *adapter;
}
-uint64_t Message::contentSize() const
-{
- return frames.getContentSize();
+void Message::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) {
+ expiryPolicy = e;
}
-bool Message::isContentLoaded() const
+bool Message::hasExpired() const
{
- return loaded;
+ return expiryPolicy && expiryPolicy->hasExpired(*this);
}
-
-namespace
+uint8_t Message::getPriority() const
{
-const std::string X_QPID_TRACE("x-qpid.trace");
+ return getEncoding().getPriority();
}
-bool Message::isExcluded(const std::vector<std::string>& excludes) const
+bool Message::getIsManagementMessage() const { return isManagementMessage; }
+void Message::setIsManagementMessage(bool b) { isManagementMessage = b; }
+qpid::framing::SequenceNumber Message::getSequence() const
{
- sys::Mutex::ScopedLock l(lock);
- const FieldTable* headers = getApplicationHeaders();
- if (headers) {
- std::string traceStr = headers->getAsString(X_QPID_TRACE);
- if (traceStr.size()) {
- std::vector<std::string> trace = split(traceStr, ", ");
-
- for (std::vector<std::string>::const_iterator i = excludes.begin(); i != excludes.end(); i++) {
- for (std::vector<std::string>::const_iterator j = trace.begin(); j != trace.end(); j++) {
- if (*i == *j) {
- return true;
- }
- }
- }
- }
- }
- return false;
+ return sequence;
}
-
-class CloneHeaderBody
+void Message::setSequence(const qpid::framing::SequenceNumber& s)
{
-public:
- void operator()(AMQFrame& f)
- {
- f.cloneBody();
- }
-};
-
-AMQHeaderBody* Message::getHeaderBody()
-{
- // expects lock to be held
- if (copyHeaderOnWrite) {
- CloneHeaderBody f;
- frames.map_if(f, TypeFilter<HEADER_BODY>());
- copyHeaderOnWrite = false;
- }
- return frames.getHeaders();
+ sequence = s;
}
-void Message::addTraceId(const std::string& id)
+MessageState Message::getState() const
{
- sys::Mutex::ScopedLock l(lock);
- if (isA<MessageTransferBody>()) {
- FieldTable& headers = getModifiableProperties<MessageProperties>()->getApplicationHeaders();
- std::string trace = headers.getAsString(X_QPID_TRACE);
- if (trace.empty()) {
- headers.setString(X_QPID_TRACE, id);
- } else if (trace.find(id) == std::string::npos) {
- trace += ",";
- trace += id;
- headers.setString(X_QPID_TRACE, trace);
- }
- }
+ return state;
}
-
-void Message::clearTrace()
+void Message::setState(MessageState s)
{
- sys::Mutex::ScopedLock l(lock);
- if (isA<MessageTransferBody>()) {
- FieldTable& headers = getModifiableProperties<MessageProperties>()->getApplicationHeaders();
- std::string trace = headers.getAsString(X_QPID_TRACE);
- if (!trace.empty()) {
- headers.setString(X_QPID_TRACE, "");
- }
- }
+ state = s;
}
-void Message::setTimestamp()
+const qpid::types::Variant::Map& Message::getAnnotations() const
{
- sys::Mutex::ScopedLock l(lock);
- DeliveryProperties* props = getModifiableProperties<DeliveryProperties>();
- time_t now = ::time(0);
- props->setTimestamp(now); // AMQP-0.10: posix time_t - secs since Epoch
+ return annotations;
}
-void Message::computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& e)
+qpid::types::Variant Message::getAnnotation(const std::string& key) const
{
- sys::Mutex::ScopedLock l(lock);
- DeliveryProperties* props = getModifiableProperties<DeliveryProperties>();
- if (props->getTtl()) {
- // AMQP requires setting the expiration property to be posix
- // time_t in seconds. TTL is in milliseconds
- if (!props->getExpiration()) {
- //only set expiration in delivery properties if not already set
- time_t now = ::time(0);
- props->setExpiration(now + (props->getTtl()/1000));
- }
- if (e) {
- // Use higher resolution time for the internal expiry calculation.
- // Prevent overflow as a signed int64_t
- Duration ttl(std::min(props->getTtl() * TIME_MSEC,
- (uint64_t) std::numeric_limits<int64_t>::max()));
- expiration = AbsTime(e->getCurrentTime(), ttl);
- setExpiryPolicy(e);
- }
- }
+ qpid::types::Variant::Map::const_iterator i = annotations.find(key);
+ if (i != annotations.end()) return i->second;
+ //FIXME: modify Encoding interface to allow retrieval of
+ //annotations of different types from the message data as received
+ //off the wire
+ return qpid::types::Variant(getEncoding().getAnnotationAsString(key));
}
-void Message::adjustTtl()
+std::string Message::getUserId() const
{
- sys::Mutex::ScopedLock l(lock);
- DeliveryProperties* props = getModifiableProperties<DeliveryProperties>();
- if (props->getTtl()) {
- if (expiration < FAR_FUTURE) {
- sys::AbsTime current(
- expiryPolicy ? expiryPolicy->getCurrentTime() : sys::AbsTime::now());
- sys::Duration ttl(current, getExpiration());
- // convert from ns to ms; set to 1 if expired
- props->setTtl(int64_t(ttl) >= 1000000 ? int64_t(ttl)/1000000 : 1);
- }
- }
+ return encoding->getUserId();
}
-void Message::setRedelivered()
+Message::Encoding& Message::getEncoding()
{
- sys::Mutex::ScopedLock l(lock);
- getModifiableProperties<framing::DeliveryProperties>()->setRedelivered(true);
+ return *encoding;
}
-
-void Message::insertCustomProperty(const std::string& key, int64_t value)
+const Message::Encoding& Message::getEncoding() const
{
- sys::Mutex::ScopedLock l(lock);
- getModifiableProperties<MessageProperties>()->getApplicationHeaders().setInt64(key,value);
+ return *encoding;
}
-
-void Message::insertCustomProperty(const std::string& key, const std::string& value)
+Message::operator bool() const
{
- sys::Mutex::ScopedLock l(lock);
- getModifiableProperties<MessageProperties>()->getApplicationHeaders().setString(key,value);
+ return encoding;
}
-void Message::removeCustomProperty(const std::string& key)
+std::string Message::getContent() const
{
- sys::Mutex::ScopedLock l(lock);
- getModifiableProperties<MessageProperties>()->getApplicationHeaders().erase(key);
+ return encoding->getContent();
}
-void Message::setExchange(const std::string& exchange)
+std::string Message::getPropertyAsString(const std::string& key) const
{
- sys::Mutex::ScopedLock l(lock);
- getModifiableProperties<DeliveryProperties>()->setExchange(exchange);
+ return encoding->getPropertyAsString(key);
}
-
-void Message::clearApplicationHeadersFlag()
+namespace {
+class PropertyRetriever : public MapHandler
{
- sys::Mutex::ScopedLock l(lock);
- getModifiableProperties<MessageProperties>()->clearApplicationHeadersFlag();
-}
-
-void Message::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) {
- expiryPolicy = e;
-}
+ public:
+ PropertyRetriever(const std::string& key) : name(key) {}
+ void handleVoid(const CharSequence&) {}
+ void handleUint8(const CharSequence& key, uint8_t value) { handle(key, value); }
+ void handleUint16(const CharSequence& key, uint16_t value) { handle(key, value); }
+ void handleUint32(const CharSequence& key, uint32_t value) { handle(key, value); }
+ void handleUint64(const CharSequence& key, uint64_t value) { handle(key, value); }
+ void handleInt8(const CharSequence& key, int8_t value) { handle(key, value); }
+ void handleInt16(const CharSequence& key, int16_t value) { handle(key, value); }
+ void handleInt32(const CharSequence& key, int32_t value) { handle(key, value); }
+ void handleInt64(const CharSequence& key, int64_t value) { handle(key, value); }
+ void handleFloat(const CharSequence& key, float value) { handle(key, value); }
+ void handleDouble(const CharSequence& key, double value) { handle(key, value); }
+ void handleString(const CharSequence& key, const CharSequence& value, const CharSequence& /*encoding*/)
+ {
+ if (matches(key)) result = std::string(value.data, value.size);
+ }
+ qpid::types::Variant getResult() { return result; }
-bool Message::hasExpired()
-{
- return expiryPolicy && expiryPolicy->hasExpired(*this);
-}
+ private:
+ std::string name;
+ qpid::types::Variant result;
-namespace {
-struct ScopedSet {
- sys::Monitor& lock;
- bool& flag;
- ScopedSet(sys::Monitor& l, bool& f) : lock(l), flag(f) {
- sys::Monitor::ScopedLock sl(lock);
- flag = true;
+ bool matches(const CharSequence& key)
+ {
+ return ::strncmp(key.data, name.data(), std::min(key.size, name.size())) == 0;
}
- ~ScopedSet(){
- sys::Monitor::ScopedLock sl(lock);
- flag = false;
- lock.notifyAll();
+
+ template <typename T> void handle(const CharSequence& key, T value)
+ {
+ if (matches(key)) result = value;
}
};
}
-
-void Message::allDequeuesComplete() {
- ScopedSet ss(callbackLock, inCallback);
- MessageCallback* cb = dequeueCallback;
- if (cb && *cb) (*cb)(intrusive_ptr<Message>(this));
-}
-
-void Message::setDequeueCompleteCallback(MessageCallback& cb) {
- sys::Mutex::ScopedLock l(callbackLock);
- while (inCallback) callbackLock.wait();
- dequeueCallback = &cb;
+qpid::types::Variant Message::getProperty(const std::string& key) const
+{
+ PropertyRetriever r(key);
+ encoding->processProperties(r);
+ return r.getResult();
}
-void Message::resetDequeueCompleteCallback() {
- sys::Mutex::ScopedLock l(callbackLock);
- while (inCallback) callbackLock.wait();
- dequeueCallback = 0;
+boost::intrusive_ptr<PersistableMessage> Message::getPersistentContext() const
+{
+ return persistentContext;
}
-uint8_t Message::getPriority() const {
- sys::Mutex::ScopedLock l(lock);
- return getAdapter().getPriority(frames);
+void Message::processProperties(MapHandler& handler) const
+{
+ encoding->processProperties(handler);
}
-bool Message::getIsManagementMessage() const { return isManagementMessage; }
-void Message::setIsManagementMessage(bool b) { isManagementMessage = b; }
-
}} // namespace qpid::broker
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Message.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Message.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Message.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Message.h Mon Aug 27 15:40:33 2012
@@ -23,194 +23,131 @@
*/
#include "qpid/broker/BrokerImportExport.h"
-#include "qpid/broker/PersistableMessage.h"
-#include "qpid/broker/MessageAdapter.h"
-#include "qpid/framing/amqp_types.h"
-#include "qpid/sys/Monitor.h"
#include "qpid/sys/Time.h"
-#include <boost/function.hpp>
-#include <boost/intrusive_ptr.hpp>
-#include <boost/shared_ptr.hpp>
-#include <memory>
+#include "qpid/types/Variant.h"
+//TODO: move the following out of framing or replace it
+#include "qpid/framing/SequenceNumber.h"
#include <string>
#include <vector>
-namespace qpid {
-
-namespace framing {
-class AMQBody;
-class AMQHeaderBody;
-class FieldTable;
-class SequenceNumber;
-}
+#include "qpid/RefCounted.h"
+#include <boost/intrusive_ptr.hpp>
+#include "qpid/broker/ExpiryPolicy.h"
+#include "qpid/broker/PersistableMessage.h"
+namespace qpid {
namespace broker {
class ConnectionToken;
-class Exchange;
-class ExchangeRegistry;
-class MessageStore;
-class Queue;
-class ExpiryPolicy;
+class MapHandler;
+
+enum MessageState
+{
+ AVAILABLE=1,
+ ACQUIRED=2,
+ DELETED=4,
+ UNAVAILABLE=8
+};
-class Message : public PersistableMessage {
+class Message {
public:
- typedef boost::function<void (const boost::intrusive_ptr<Message>&)> MessageCallback;
+ class Encoding : public AsyncCompletion
+ {
+ public:
+ virtual ~Encoding() {}
+ virtual std::string getRoutingKey() const = 0;
+ virtual bool isPersistent() const = 0;
+ virtual uint8_t getPriority() const = 0;
+ virtual uint64_t getContentSize() const = 0;
+ virtual std::string getPropertyAsString(const std::string& key) const = 0;
+ virtual std::string getAnnotationAsString(const std::string& key) const = 0;
+ virtual bool getTtl(uint64_t&) const = 0;
+ virtual bool hasExpiration() const = 0;
+ virtual std::string getContent() const = 0;
+ virtual void processProperties(MapHandler&) const = 0;
+ virtual std::string getUserId() const = 0;
+ };
- QPID_BROKER_EXTERN Message(const framing::SequenceNumber& id = framing::SequenceNumber());
+ QPID_BROKER_EXTERN Message(boost::intrusive_ptr<Encoding>, boost::intrusive_ptr<PersistableMessage>);
+ QPID_BROKER_EXTERN Message();
QPID_BROKER_EXTERN ~Message();
- uint64_t getPersistenceId() const { return persistenceId; }
- void setPersistenceId(uint64_t _persistenceId) const { persistenceId = _persistenceId; }
-
- bool getRedelivered() const { return redelivered; }
- void redeliver() { redelivered = true; }
+ bool isRedelivered() const { return deliveryCount > 1; }
+ void deliver() { ++deliveryCount; }
+ void undeliver() { --deliveryCount; }
+ int getDeliveryCount() const { return deliveryCount; }
+ void resetDeliveryCount() { deliveryCount = 0; }
const ConnectionToken* getPublisher() const { return publisher; }
void setPublisher(ConnectionToken* p) { publisher = p; }
- const framing::SequenceNumber& getCommandId() { return frames.getId(); }
-
- QPID_BROKER_EXTERN uint64_t contentSize() const;
QPID_BROKER_EXTERN std::string getRoutingKey() const;
- const boost::shared_ptr<Exchange> getExchange(ExchangeRegistry&) const;
- QPID_BROKER_EXTERN std::string getExchangeName() const;
- bool isImmediate() const;
- QPID_BROKER_EXTERN const framing::FieldTable* getApplicationHeaders() const;
- QPID_BROKER_EXTERN std::string getAppId() const;
QPID_BROKER_EXTERN bool isPersistent() const;
- bool requiresAccept();
/** determine msg expiration time using the TTL value if present */
QPID_BROKER_EXTERN void computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& e);
void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e);
- bool hasExpired();
+
+ bool hasExpired() const;
sys::AbsTime getExpiration() const { return expiration; }
void setExpiration(sys::AbsTime exp) { expiration = exp; }
- void adjustTtl();
- void setRedelivered();
- QPID_BROKER_EXTERN void insertCustomProperty(const std::string& key, int64_t value);
- QPID_BROKER_EXTERN void insertCustomProperty(const std::string& key, const std::string& value);
- QPID_BROKER_EXTERN void removeCustomProperty(const std::string& key);
- void setExchange(const std::string&);
- void clearApplicationHeadersFlag();
+ uint64_t getTtl() const;
+
/** set the timestamp delivery property to the current time-of-day */
QPID_BROKER_EXTERN void setTimestamp();
+ QPID_BROKER_EXTERN uint64_t getTimestamp() const;
- framing::FrameSet& getFrames() { return frames; }
- const framing::FrameSet& getFrames() const { return frames; }
-
- template <class T> const T* getProperties() const {
- const qpid::framing::AMQHeaderBody* p = frames.getHeaders();
- return p->get<T>();
- }
-
- template <class T> const T* hasProperties() const {
- const qpid::framing::AMQHeaderBody* p = frames.getHeaders();
- return p->get<T>();
- }
-
- template <class T> void eraseProperties() {
- qpid::framing::AMQHeaderBody* p = frames.getHeaders();
- p->erase<T>();
- }
-
- template <class T> const T* getMethod() const {
- return frames.as<T>();
- }
-
- template <class T> T* getMethod() {
- return frames.as<T>();
- }
-
- template <class T> bool isA() const {
- return frames.isA<T>();
- }
-
- uint32_t getRequiredCredit();
-
- void encode(framing::Buffer& buffer) const;
- void encodeContent(framing::Buffer& buffer) const;
-
- /**
- * @returns the size of the buffer needed to encode this
- * message in its entirety
- */
- uint32_t encodedSize() const;
- /**
- * @returns the size of the buffer needed to encode the
- * 'header' of this message (not just the header frame,
- * but other meta data e.g.routing key and exchange)
- */
- uint32_t encodedHeaderSize() const;
- uint32_t encodedContentSize() const;
-
- QPID_BROKER_EXTERN void decodeHeader(framing::Buffer& buffer);
- QPID_BROKER_EXTERN void decodeContent(framing::Buffer& buffer);
-
- void QPID_BROKER_EXTERN tryReleaseContent();
- void releaseContent();
- void releaseContent(MessageStore* s);//deprecated, use 'setStore(store); releaseContent();' instead
- void destroy();
-
- bool getContentFrame(const Queue& queue, framing::AMQFrame& frame, uint16_t maxContentSize, uint64_t offset) const;
- QPID_BROKER_EXTERN void sendContent(const Queue& queue, framing::FrameHandler& out, uint16_t maxFrameSize) const;
- void sendHeader(framing::FrameHandler& out, uint16_t maxFrameSize) const;
-
- QPID_BROKER_EXTERN bool isContentLoaded() const;
-
- bool isExcluded(const std::vector<std::string>& excludes) const;
- void addTraceId(const std::string& id);
- void clearTrace();
-
- void forcePersistent();
- bool isForcedPersistent();
-
- /** Call cb when dequeue is complete, may call immediately. Holds cb by reference. */
- void setDequeueCompleteCallback(MessageCallback& cb);
- void resetDequeueCompleteCallback();
+ QPID_BROKER_EXTERN void addAnnotation(const std::string& key, const qpid::types::Variant& value);
+ QPID_BROKER_EXTERN bool isExcluded(const std::vector<std::string>& excludes) const;
+ QPID_BROKER_EXTERN void addTraceId(const std::string& id);
+ QPID_BROKER_EXTERN void clearTrace();
+ QPID_BROKER_EXTERN uint8_t getPriority() const;
+ QPID_BROKER_EXTERN std::string getPropertyAsString(const std::string& key) const;
+ QPID_BROKER_EXTERN qpid::types::Variant getProperty(const std::string& key) const;
+ void processProperties(MapHandler&) const;
+
+ QPID_BROKER_EXTERN uint64_t getContentSize() const;
+
+ Encoding& getEncoding();
+ const Encoding& getEncoding() const;
+ QPID_BROKER_EXTERN operator bool() const;
- uint8_t getPriority() const;
bool getIsManagementMessage() const;
void setIsManagementMessage(bool b);
- private:
- MessageAdapter& getAdapter() const;
- void allDequeuesComplete();
- mutable sys::Mutex lock;
- framing::FrameSet frames;
- mutable boost::shared_ptr<Exchange> exchange;
- mutable uint64_t persistenceId;
- bool redelivered;
- bool loaded;
- bool staged;
- bool forcePersistentPolicy; // used to force message as durable, via a broker policy
- ConnectionToken* publisher;
- mutable MessageAdapter* adapter;
- qpid::sys::AbsTime expiration;
- boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
+ QPID_BROKER_EXTERN qpid::framing::SequenceNumber getSequence() const;
+ QPID_BROKER_EXTERN void setSequence(const qpid::framing::SequenceNumber&);
- static TransferAdapter TRANSFER;
+ MessageState getState() const;
+ void setState(MessageState);
- mutable boost::intrusive_ptr<Message> empty;
+ QPID_BROKER_EXTERN qpid::types::Variant getAnnotation(const std::string& key) const;
+ QPID_BROKER_EXTERN const qpid::types::Variant::Map& getAnnotations() const;
+ std::string getUserId() const;
- sys::Monitor callbackLock;
- MessageCallback* dequeueCallback;
- bool inCallback;
+ QPID_BROKER_EXTERN std::string getContent() const;//TODO: may be better to get rid of this...
- uint32_t requiredCredit;
+ QPID_BROKER_EXTERN boost::intrusive_ptr<AsyncCompletion> getIngressCompletion() const;
+ QPID_BROKER_EXTERN boost::intrusive_ptr<PersistableMessage> getPersistentContext() const;
+ private:
+ boost::intrusive_ptr<Encoding> encoding;
+ boost::intrusive_ptr<PersistableMessage> persistentContext;
+ int deliveryCount;
+ ConnectionToken* publisher;
+ qpid::sys::AbsTime expiration;
+ boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
+ uint64_t timestamp;
+ qpid::types::Variant::Map annotations;
bool isManagementMessage;
- mutable bool copyHeaderOnWrite;
+ MessageState state;
+ qpid::framing::SequenceNumber sequence;
- /**
- * Expects lock to be held
- */
- template <class T> T* getModifiableProperties() {
- return getHeaderBody()->get<T>(true);
- }
- qpid::framing::AMQHeaderBody* getHeaderBody();
+ void annotationsChanged();
};
+QPID_BROKER_EXTERN void encode(const Message&, std::string&);
+QPID_BROKER_EXTERN void decode(const std::string&, Message&);
+
}}
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/MessageBuilder.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/MessageBuilder.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/MessageBuilder.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/MessageBuilder.cpp Mon Aug 27 15:40:33 2012
@@ -21,10 +21,11 @@
#include "qpid/broker/MessageBuilder.h"
#include "qpid/broker/Message.h"
-#include "qpid/broker/MessageStore.h"
-#include "qpid/broker/NullMessageStore.h"
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/MessageTransferBody.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/log/Statement.h"
using boost::intrusive_ptr;
using namespace qpid::broker;
@@ -36,8 +37,7 @@ namespace
const std::string QPID_MANAGEMENT("qpid.management");
}
-MessageBuilder::MessageBuilder(MessageStore* const _store) :
- state(DORMANT), store(_store) {}
+MessageBuilder::MessageBuilder() : state(DORMANT) {}
void MessageBuilder::handle(AMQFrame& frame)
{
@@ -45,6 +45,7 @@ void MessageBuilder::handle(AMQFrame& fr
switch(state) {
case METHOD:
checkType(METHOD_BODY, type);
+ exchange = frame.castBody<qpid::framing::MessageTransferBody>()->getDestination();
state = HEADER;
break;
case HEADER:
@@ -55,7 +56,9 @@ void MessageBuilder::handle(AMQFrame& fr
header.setBof(false);
header.setEof(false);
message->getFrames().append(header);
- } else if (type != HEADER_BODY) {
+ } else if (type == HEADER_BODY) {
+ frame.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true)->setExchange(exchange);
+ } else {
throw CommandInvalidException(
QPID_MSG("Invalid frame sequence for message, expected header or content got "
<< type_str(type) << ")"));
@@ -73,14 +76,14 @@ void MessageBuilder::handle(AMQFrame& fr
void MessageBuilder::end()
{
+ message->computeRequiredCredit();
message = 0;
state = DORMANT;
}
void MessageBuilder::start(const SequenceNumber& id)
{
- message = intrusive_ptr<Message>(new Message(id));
- message->setStore(store);
+ message = intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer>(new qpid::broker::amqp_0_10::MessageTransfer(id));
state = METHOD;
}
@@ -112,3 +115,5 @@ void MessageBuilder::checkType(uint8_t e
<< type_str(expected) << " got " << type_str(actual) << ")"));
}
}
+
+boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> MessageBuilder::getMessage() { return message; }
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org