You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2012/08/10 14:04:32 UTC
svn commit: r1371676 [6/8] - in /qpid/trunk/qpid: cpp/src/
cpp/src/qpid/broker/ cpp/src/qpid/broker/amqp_0_10/ cpp/src/qpid/ha/
cpp/src/qpid/management/ cpp/src/qpid/replication/ cpp/src/qpid/store/
cpp/src/qpid/xml/ cpp/src/tests/ tests/src/py/qpid_te...
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Fri Aug 10 12:04:27 2012
@@ -21,6 +21,7 @@
#include "qpid/broker/SessionState.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/ConnectionState.h"
+#include "qpid/broker/DeliverableMessage.h"
#include "qpid/broker/DeliveryRecord.h"
#include "qpid/broker/SessionManager.h"
#include "qpid/broker/SessionHandler.h"
@@ -28,6 +29,7 @@
#include "qpid/framing/AMQContentBody.h"
#include "qpid/framing/AMQHeaderBody.h"
#include "qpid/framing/AMQMethodBody.h"
+#include "qpid/framing/MessageTransferBody.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/ServerInvoker.h"
#include "qpid/log/Statement.h"
@@ -55,9 +57,8 @@ SessionState::SessionState(
const SessionState::Configuration& config, bool delayManagement)
: qpid::SessionState(id, config),
broker(b), handler(&h),
- semanticState(*this, *this),
+ semanticState(*this),
adapter(semanticState),
- msgBuilder(&broker.getStore()),
mgmtObject(0),
asyncCommandCompleter(new AsyncCommandCompleter(this))
{
@@ -208,7 +209,7 @@ void SessionState::handleContent(AMQFram
{
if (frame.getBof() && frame.getBos()) //start of frameset
msgBuilder.start(id);
- intrusive_ptr<Message> msg(msgBuilder.getMessage());
+ intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> msg(msgBuilder.getMessage());
msgBuilder.handle(frame);
if (frame.getEof() && frame.getEos()) {//end of frameset
if (frame.getBof()) {
@@ -218,13 +219,16 @@ void SessionState::handleContent(AMQFram
header.setEof(false);
msg->getFrames().append(header);
}
+ DeliverableMessage deliverable(Message(msg, msg), semanticState.getTxBuffer());
if (broker.isTimestamping())
- msg->setTimestamp();
- msg->setPublisher(&getConnection());
+ deliverable.getMessage().setTimestamp();
+ deliverable.getMessage().setPublisher(&getConnection());
+
+
+ IncompleteIngressMsgXfer xfer(this, msg);
msg->getIngressCompletion().begin();
- semanticState.handle(msg);
+ semanticState.route(deliverable.getMessage(), deliverable);
msgBuilder.end();
- IncompleteIngressMsgXfer xfer(this, msg);
msg->getIngressCompletion().end(xfer); // allows msg to complete xfer
}
}
@@ -294,18 +298,28 @@ void SessionState::handleOut(AMQFrame& f
handler->out(frame);
}
-void SessionState::deliver(DeliveryRecord& msg, bool sync)
+DeliveryId SessionState::deliver(const qpid::broker::amqp_0_10::MessageTransfer& message,
+ const std::string& destination, bool isRedelivered, uint64_t ttl, uint64_t timestamp,
+ qpid::framing::message::AcceptMode acceptMode, qpid::framing::message::AcquireMode acquireMode,
+ const qpid::types::Variant::Map& annotations, bool sync)
{
uint32_t maxFrameSize = getConnection().getFrameMax();
assert(senderGetCommandPoint().offset == 0);
SequenceNumber commandId = senderGetCommandPoint().command;
- msg.deliver(getProxy().getHandler(), commandId, maxFrameSize);
+
+ framing::AMQFrame method((framing::MessageTransferBody(framing::ProtocolVersion(), destination, acceptMode, acquireMode)));
+ method.setEof(false);
+ getProxy().getHandler().handle(method);
+ message.sendHeader(getProxy().getHandler(), maxFrameSize, isRedelivered, ttl, timestamp, annotations);
+ message.sendContent(getProxy().getHandler(), maxFrameSize);
+
assert(senderGetCommandPoint() == SessionPoint(commandId+1, 0)); // Delivery has moved sendPoint.
if (sync) {
AMQP_ClientProxy::Execution& p(getProxy().getExecution());
Proxy::ScopedSync s(p);
p.sync();
}
+ return commandId;
}
void SessionState::sendCompletion() {
@@ -349,7 +363,6 @@ void SessionState::addPendingExecutionSy
}
}
-
/** factory for creating a reference-counted IncompleteIngressMsgXfer object
* which will be attached to a message that will be completed asynchronously.
*/
@@ -408,10 +421,10 @@ void SessionState::AsyncCommandCompleter
/** Track an ingress message that is pending completion */
-void SessionState::AsyncCommandCompleter::addPendingMessage(boost::intrusive_ptr<Message> msg)
+void SessionState::AsyncCommandCompleter::addPendingMessage(boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> msg)
{
qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
- std::pair<SequenceNumber, boost::intrusive_ptr<Message> > item(msg->getCommandId(), msg);
+ std::pair<SequenceNumber, boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> > item(msg->getCommandId(), msg);
bool unique = pendingMsgs.insert(item).second;
if (!unique) {
assert(false);
@@ -430,13 +443,13 @@ void SessionState::AsyncCommandCompleter
/** done when an execution.sync arrives */
void SessionState::AsyncCommandCompleter::flushPendingMessages()
{
- std::map<SequenceNumber, boost::intrusive_ptr<Message> > copy;
+ std::map<SequenceNumber, boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> > copy;
{
qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
pendingMsgs.swap(copy); // we've only tracked these in case a flush is needed, so nuke 'em now.
}
// drop lock, so it is safe to call "flush()"
- for (std::map<SequenceNumber, boost::intrusive_ptr<Message> >::iterator i = copy.begin();
+ for (std::map<SequenceNumber, boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> >::iterator i = copy.begin();
i != copy.end(); ++i) {
i->second->flush();
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Fri Aug 10 12:04:27 2012
@@ -23,17 +23,18 @@
*/
#include "qpid/SessionState.h"
+#include "qpid/framing/enum.h"
#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/SequenceSet.h"
#include "qpid/sys/Time.h"
#include "qpid/management/Manageable.h"
#include "qmf/org/apache/qpid/broker/Session.h"
#include "qpid/broker/SessionAdapter.h"
-#include "qpid/broker/DeliveryAdapter.h"
#include "qpid/broker/AsyncCompletion.h"
#include "qpid/broker/MessageBuilder.h"
#include "qpid/broker/SessionContext.h"
#include "qpid/broker/SemanticState.h"
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
#include "qpid/sys/Monitor.h"
#include <boost/noncopyable.hpp>
@@ -58,7 +59,6 @@ namespace broker {
class Broker;
class ConnectionState;
-class Message;
class SessionHandler;
class SessionManager;
@@ -68,7 +68,6 @@ class SessionManager;
*/
class SessionState : public qpid::SessionState,
public SessionContext,
- public DeliveryAdapter,
public management::Manageable,
public framing::FrameHandler::InOutHandler
{
@@ -105,8 +104,10 @@ class SessionState : public qpid::Sessio
void sendCompletion();
- //delivery adapter methods:
- void deliver(DeliveryRecord&, bool sync);
+ DeliveryId deliver(const qpid::broker::amqp_0_10::MessageTransfer& message,
+ const std::string& destination, bool isRedelivered, uint64_t ttl, uint64_t timestamp,
+ qpid::framing::message::AcceptMode, qpid::framing::message::AcquireMode,
+ const qpid::types::Variant::Map& annotations, bool sync);
// Manageable entry points
management::ManagementObject* GetManagementObject (void) const;
@@ -117,7 +118,7 @@ class SessionState : public qpid::Sessio
// Used by cluster to create replica sessions.
SemanticState& getSemanticState() { return semanticState; }
- boost::intrusive_ptr<Message> getMessageInProgress() { return msgBuilder.getMessage(); }
+ boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> getMessageInProgress() { return msgBuilder.getMessage(); }
SessionAdapter& getSessionAdapter() { return adapter; }
const SessionId& getSessionId() const { return getId(); }
@@ -199,7 +200,7 @@ class SessionState : public qpid::Sessio
// If an ingress message does not require a Sync, we need to
// hold a reference to it in case an Execution.Sync command is received and we
// have to manually flush the message.
- std::map<SequenceNumber, boost::intrusive_ptr<Message> > pendingMsgs;
+ std::map<SequenceNumber, boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> > pendingMsgs;
/** complete all pending commands, runs in IO thread */
void completeCommands();
@@ -212,7 +213,7 @@ class SessionState : public qpid::Sessio
~AsyncCommandCompleter() {};
/** track a message pending ingress completion */
- void addPendingMessage(boost::intrusive_ptr<Message> m);
+ void addPendingMessage(boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> m);
void deletePendingMessage(SequenceNumber id);
void flushPendingMessages();
/** schedule the processing of a completed ingress message.transfer command */
@@ -246,29 +247,29 @@ class SessionState : public qpid::Sessio
{
public:
IncompleteIngressMsgXfer( SessionState *ss,
- boost::intrusive_ptr<Message> m )
+ boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> m)
: AsyncCommandContext(ss, m->getCommandId()),
- session(ss),
- msg(m),
- requiresAccept(m->requiresAccept()),
- requiresSync(m->getFrames().getMethod()->isSync()),
- pending(false) {}
+ session(ss),
+ msg(m),
+ requiresAccept(m->requiresAccept()),
+ requiresSync(m->getFrames().getMethod()->isSync()),
+ pending(false) {}
IncompleteIngressMsgXfer( const IncompleteIngressMsgXfer& x )
: AsyncCommandContext(x.session, x.msg->getCommandId()),
- session(x.session),
- msg(x.msg),
- requiresAccept(x.requiresAccept),
- requiresSync(x.requiresSync),
- pending(x.pending) {}
+ session(x.session),
+ msg(x.msg),
+ requiresAccept(x.requiresAccept),
+ requiresSync(x.requiresSync),
+ pending(x.pending) {}
- virtual ~IncompleteIngressMsgXfer() {};
+ virtual ~IncompleteIngressMsgXfer() {};
virtual void completed(bool);
virtual boost::intrusive_ptr<AsyncCompletion::Callback> clone();
private:
SessionState *session; // only valid if sync flag in callback is true
- boost::intrusive_ptr<Message> msg;
+ boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> msg;
bool requiresAccept;
bool requiresSync;
bool pending; // true if msg saved on pending list...
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/ThresholdAlerts.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/ThresholdAlerts.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/ThresholdAlerts.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/ThresholdAlerts.cpp Fri Aug 10 12:04:27 2012
@@ -20,7 +20,8 @@
*/
#include "qpid/broker/ThresholdAlerts.h"
#include "qpid/broker/Queue.h"
-#include "qpid/broker/QueuedMessage.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/log/Statement.h"
#include "qpid/management/ManagementAgent.h"
@@ -30,21 +31,20 @@ namespace qpid {
namespace broker {
namespace {
const qmf::org::apache::qpid::broker::EventQueueThresholdExceeded EVENT("dummy", 0, 0);
-bool isQMFv2(const boost::intrusive_ptr<Message> message)
+bool isQMFv2(const Message& message)
{
- const qpid::framing::MessageProperties* props = message->getProperties<qpid::framing::MessageProperties>();
+ const qpid::framing::MessageProperties* props = qpid::broker::amqp_0_10::MessageTransfer::get(message).getProperties<qpid::framing::MessageProperties>();
return props && props->getAppId() == "qmf2";
}
-bool isThresholdEvent(const boost::intrusive_ptr<Message> message)
+bool isThresholdEvent(const Message& message)
{
- if (message->getIsManagementMessage()) {
+ if (message.getIsManagementMessage()) {
//is this a qmf event? if so is it a threshold event?
if (isQMFv2(message)) {
- const qpid::framing::FieldTable* headers = message->getApplicationHeaders();
- if (headers && headers->getAsString("qmf.content") == "_event") {
+ if (message.getPropertyAsString("qmf.content") == "_event") {
//decode as list
- std::string content = message->getFrames().getContent();
+ std::string content = qpid::broker::amqp_0_10::MessageTransfer::get(message).getFrames().getContent();
qpid::types::Variant::List list;
qpid::amqp_0_10::ListCodec::decode(content, list);
if (list.empty() || list.front().getType() != qpid::types::VAR_MAP) return false;
@@ -57,7 +57,7 @@ bool isThresholdEvent(const boost::intru
}
}
} else {
- std::string content = message->getFrames().getContent();
+ std::string content = qpid::broker::amqp_0_10::MessageTransfer::get(message).getFrames().getContent();
qpid::framing::Buffer buffer(const_cast<char*>(content.data()), content.size());
if (buffer.getOctet() == 'A' && buffer.getOctet() == 'M' && buffer.getOctet() == '2' && buffer.getOctet() == 'e') {
buffer.getLong();//sequence
@@ -83,9 +83,9 @@ ThresholdAlerts::ThresholdAlerts(const s
repeatInterval(repeat ? repeat*qpid::sys::TIME_SEC : 0),
count(0), size(0), lastAlert(qpid::sys::EPOCH) {}
-void ThresholdAlerts::enqueued(const QueuedMessage& m)
+void ThresholdAlerts::enqueued(const Message& m)
{
- size += m.payload->contentSize();
+ size += m.getContentSize();
++count;
if ((countThreshold && count >= countThreshold) || (sizeThreshold && size >= sizeThreshold)) {
if ((repeatInterval == 0 && lastAlert == qpid::sys::EPOCH)
@@ -94,7 +94,7 @@ void ThresholdAlerts::enqueued(const Que
//enqueued on queues; it may even be that this event
//causes a message to be enqueued on the queue we are
//tracking, and so we need to avoid recursing
- if (isThresholdEvent(m.payload)) return;
+ if (isThresholdEvent(m)) return;
lastAlert = qpid::sys::now();
agent.raiseEvent(qmf::org::apache::qpid::broker::EventQueueThresholdExceeded(name, count, size));
QPID_LOG(info, "Threshold event triggered for " << name << ", count=" << count << ", size=" << size);
@@ -102,9 +102,9 @@ void ThresholdAlerts::enqueued(const Que
}
}
-void ThresholdAlerts::dequeued(const QueuedMessage& m)
+void ThresholdAlerts::dequeued(const Message& m)
{
- size -= m.payload->contentSize();
+ size -= m.getContentSize();
--count;
if ((countThreshold && count < countThreshold) || (sizeThreshold && size < sizeThreshold)) {
lastAlert = qpid::sys::EPOCH;
@@ -127,65 +127,14 @@ void ThresholdAlerts::observe(Queue& que
}
void ThresholdAlerts::observe(Queue& queue, qpid::management::ManagementAgent& agent,
- const qpid::framing::FieldTable& settings, uint16_t limitRatio)
-
-{
- qpid::types::Variant::Map map;
- qpid::amqp_0_10::translate(settings, map);
- observe(queue, agent, map, limitRatio);
-}
-
-template <class T>
-class Option
+ const QueueSettings& settings, uint16_t limitRatio)
{
- public:
- Option(const std::string& name, T d) : defaultValue(d) { names.push_back(name); }
- void addAlias(const std::string& name) { names.push_back(name); }
- T get(const qpid::types::Variant::Map& settings) const
- {
- T value(defaultValue);
- for (std::vector<std::string>::const_iterator i = names.begin(); i != names.end(); ++i) {
- if (get(settings, *i, value)) break;
- }
- return value;
- }
- private:
- std::vector<std::string> names;
- T defaultValue;
-
- bool get(const qpid::types::Variant::Map& settings, const std::string& name, T& value) const
- {
- qpid::types::Variant::Map::const_iterator i = settings.find(name);
- if (i != settings.end()) {
- try {
- value = (T) i->second;
- } catch (const qpid::types::InvalidConversion&) {
- QPID_LOG(warning, "Bad value for" << name << ": " << i->second);
- }
- return true;
- } else {
- return false;
- }
- }
-};
-
-void ThresholdAlerts::observe(Queue& queue, qpid::management::ManagementAgent& agent,
- const qpid::types::Variant::Map& settings, uint16_t limitRatio)
-
-{
- //Note: aliases are keys defined by java broker
- Option<int64_t> repeatInterval("qpid.alert_repeat_gap", 60);
- repeatInterval.addAlias("x-qpid-minimum-alert-repeat-gap");
-
//If no explicit threshold settings were given use specified
//percentage of any limit from the policy.
- const QueuePolicy* policy = queue.getPolicy();
- Option<uint32_t> countThreshold("qpid.alert_count", (uint32_t) (policy && limitRatio ? (policy->getMaxCount()*limitRatio/100) : 0));
- countThreshold.addAlias("x-qpid-maximum-message-count");
- Option<uint64_t> sizeThreshold("qpid.alert_size", (uint64_t) (policy && limitRatio ? (policy->getMaxSize()*limitRatio/100) : 0));
- sizeThreshold.addAlias("x-qpid-maximum-message-size");
+ uint32_t countThreshold = settings.alertThreshold.hasCount() ? settings.alertThreshold.getCount() : (settings.maxDepth.getCount()*limitRatio/100);
+ uint32_t sizeThreshold = settings.alertThreshold.hasSize() ? settings.alertThreshold.getSize() : (settings.maxDepth.getSize()*limitRatio/100);
- observe(queue, agent, countThreshold.get(settings), sizeThreshold.get(settings), repeatInterval.get(settings));
+ observe(queue, agent, countThreshold, sizeThreshold, settings.alertRepeatInterval);
}
}}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/ThresholdAlerts.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/ThresholdAlerts.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/ThresholdAlerts.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/ThresholdAlerts.h Fri Aug 10 12:04:27 2012
@@ -27,15 +27,13 @@
#include <string>
namespace qpid {
-namespace framing {
-class FieldTable;
-}
namespace management {
class ManagementAgent;
}
namespace broker {
class Queue;
+struct QueueSettings;
/**
* Class to manage generation of QMF alerts when particular thresholds
* are breached on a queue.
@@ -48,19 +46,17 @@ class ThresholdAlerts : public QueueObse
const uint32_t countThreshold,
const uint64_t sizeThreshold,
const long repeatInterval);
- void enqueued(const QueuedMessage&);
- void dequeued(const QueuedMessage&);
- void acquired(const QueuedMessage&) {};
- void requeued(const QueuedMessage&) {};
+ void enqueued(const Message&);
+ void dequeued(const Message&);
+ void acquired(const Message&) {};
+ void requeued(const Message&) {};
static void observe(Queue& queue, qpid::management::ManagementAgent& agent,
const uint64_t countThreshold,
const uint64_t sizeThreshold,
const long repeatInterval);
static void observe(Queue& queue, qpid::management::ManagementAgent& agent,
- const qpid::framing::FieldTable& settings, uint16_t limitRatio);
- static void observe(Queue& queue, qpid::management::ManagementAgent& agent,
- const qpid::types::Variant::Map& settings, uint16_t limitRatio);
+ const QueueSettings& settings, uint16_t limitRatio);
private:
const std::string name;
qpid::management::ManagementAgent& agent;
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.h Fri Aug 10 12:04:27 2012
@@ -71,7 +71,6 @@ namespace qpid {
virtual void commit() throw();
virtual void rollback() throw();
virtual ~TxAccept(){}
- virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); }
// Used by cluster replication.
const framing::SequenceSet& getAcked() const { return acked; }
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp Fri Aug 10 12:04:27 2012
@@ -28,7 +28,7 @@ using namespace qpid::broker;
bool TxBuffer::prepare(TransactionContext* const ctxt)
{
- for(op_iterator i = ops.begin(); i < ops.end(); i++){
+ for(op_iterator i = ops.begin(); i != ops.end(); i++){
if(!(*i)->prepare(ctxt)){
return false;
}
@@ -74,7 +74,3 @@ bool TxBuffer::commitLocal(Transactional
}
return false;
}
-
-void TxBuffer::accept(TxOpConstVisitor& v) const {
- std::for_each(ops.begin(), ops.end(), boost::bind(&TxOp::accept, _1, boost::ref(v)));
-}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.h Fri Aug 10 12:04:27 2012
@@ -108,9 +108,6 @@ namespace qpid {
* commit
*/
QPID_BROKER_EXTERN bool commitLocal(TransactionalStore* const store);
-
- // Used by cluster to replicate transaction status.
- void accept(TxOpConstVisitor& v) const;
};
}
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/TxOp.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/TxOp.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/TxOp.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/TxOp.h Fri Aug 10 12:04:27 2012
@@ -21,7 +21,6 @@
#ifndef _TxOp_
#define _TxOp_
-#include "qpid/broker/TxOpVisitor.h"
#include "qpid/broker/TransactionalStore.h"
#include <boost/shared_ptr.hpp>
@@ -36,8 +35,6 @@ namespace qpid {
virtual void commit() throw() = 0;
virtual void rollback() throw() = 0;
virtual ~TxOp(){}
-
- virtual void accept(TxOpConstVisitor&) const = 0;
};
}} // namespace qpid::broker
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/TxOpVisitor.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/TxOpVisitor.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/TxOpVisitor.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/TxOpVisitor.h Fri Aug 10 12:04:27 2012
@@ -1,97 +0,0 @@
-#ifndef QPID_BROKER_TXOPVISITOR_H
-#define QPID_BROKER_TXOPVISITOR_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-namespace qpid {
-namespace broker {
-
-class DtxAck;
-class RecoveredDequeue;
-class RecoveredEnqueue;
-class TxAccept;
-class TxPublish;
-
-/**
- * Visitor for TxOp familly of classes.
- */
-struct TxOpConstVisitor
-{
- virtual ~TxOpConstVisitor() {}
- virtual void operator()(const DtxAck&) = 0;
- virtual void operator()(const RecoveredDequeue&) = 0;
- virtual void operator()(const RecoveredEnqueue&) = 0;
- virtual void operator()(const TxAccept&) = 0;
- virtual void operator()(const TxPublish&) = 0;
-};
-
-}} // namespace qpid::broker
-
-#endif /*!QPID_BROKER_TXOPVISITOR_H*/
-#ifndef QPID_BROKER_TXOPVISITOR_H
-#define QPID_BROKER_TXOPVISITOR_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-namespace qpid {
-namespace broker {
-
-class DtxAck;
-class RecoveredDequeue;
-class RecoveredEnqueue;
-class TxAccept;
-class TxPublish;
-
-/**
- * Visitor for TxOp familly of classes.
- */
-struct TxOpConstVisitor
-{
- virtual ~TxOpConstVisitor() {}
- virtual void operator()(const DtxAck&) = 0;
- virtual void operator()(const RecoveredDequeue&) = 0;
- virtual void operator()(const RecoveredEnqueue&) = 0;
- virtual void operator()(const TxAccept&) = 0;
- virtual void operator()(const TxPublish&) = 0;
-};
-
-}} // namespace qpid::broker
-
-#endif /*!QPID_BROKER_TXOPVISITOR_H*/
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp Fri Aug 10 12:04:27 2012
@@ -1,111 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "qpid/log/Statement.h"
-#include "qpid/broker/TxPublish.h"
-#include "qpid/broker/Queue.h"
-
-using boost::intrusive_ptr;
-using namespace qpid::broker;
-
-TxPublish::TxPublish(intrusive_ptr<Message> _msg) : msg(_msg) {}
-
-bool TxPublish::prepare(TransactionContext* ctxt) throw()
-{
- try{
- while (!queues.empty()) {
- prepare(ctxt, queues.front());
- prepared.push_back(queues.front());
- queues.pop_front();
- }
- return true;
- }catch(const std::exception& e){
- QPID_LOG(error, "Failed to prepare: " << e.what());
- }catch(...){
- QPID_LOG(error, "Failed to prepare (unknown error)");
- }
- return false;
-}
-
-void TxPublish::commit() throw()
-{
- try {
- for_each(prepared.begin(), prepared.end(), Commit(msg));
- if (msg->isContentReleaseRequested()) {
- // NOTE: The log messages in this section are used for flow-to-disk testing (which checks the log for the
- // presence of these messages). Do not change these without also checking these tests.
- if (msg->isContentReleaseBlocked()) {
- QPID_LOG(debug, "Message id=\"" << msg->getProperties<qpid::framing::MessageProperties>()->getMessageId() << "\"; pid=0x" <<
- std::hex << msg->getPersistenceId() << std::dec << ": Content release blocked on commit");
- } else {
- msg->releaseContent();
- QPID_LOG(debug, "Message id=\"" << msg->getProperties<qpid::framing::MessageProperties>()->getMessageId() << "\"; pid=0x" <<
- std::hex << msg->getPersistenceId() << std::dec << ": Content released on commit");
- }
- }
- } catch (const std::exception& e) {
- QPID_LOG(error, "Failed to commit: " << e.what());
- } catch(...) {
- QPID_LOG(error, "Failed to commit (unknown error)");
- }
-}
-
-void TxPublish::rollback() throw()
-{
- try {
- for_each(prepared.begin(), prepared.end(), Rollback(msg));
- } catch (const std::exception& e) {
- QPID_LOG(error, "Failed to complete rollback: " << e.what());
- } catch(...) {
- QPID_LOG(error, "Failed to complete rollback (unknown error)");
- }
-
-}
-
-void TxPublish::deliverTo(const boost::shared_ptr<Queue>& queue){
- if (!queue->isLocal(msg)) {
- queues.push_back(queue);
- delivered = true;
- } else {
- QPID_LOG(debug, "Won't enqueue local message for " << queue->getName());
- }
-}
-
-void TxPublish::prepare(TransactionContext* ctxt, const boost::shared_ptr<Queue> queue)
-{
- queue->enqueue(ctxt, msg);
-}
-
-TxPublish::Commit::Commit(intrusive_ptr<Message>& _msg) : msg(_msg){}
-
-void TxPublish::Commit::operator()(const boost::shared_ptr<Queue>& queue){
- queue->process(msg);
-}
-
-TxPublish::Rollback::Rollback(intrusive_ptr<Message>& _msg) : msg(_msg){}
-
-void TxPublish::Rollback::operator()(const boost::shared_ptr<Queue>& queue){
- queue->enqueueAborted(msg);
-}
-
-uint64_t TxPublish::contentSize ()
-{
- return msg->contentSize ();
-}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h Fri Aug 10 12:04:27 2012
@@ -1,92 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _TxPublish_
-#define _TxPublish_
-
-#include "qpid/broker/BrokerImportExport.h"
-#include "qpid/broker/Deliverable.h"
-#include "qpid/broker/Message.h"
-#include "qpid/broker/MessageStore.h"
-#include "qpid/broker/TxOp.h"
-
-#include <algorithm>
-#include <functional>
-#include <list>
-
-#include <boost/intrusive_ptr.hpp>
-
-namespace qpid {
-namespace broker {
-/**
- * Defines the behaviour for publish operations on a
- * transactional channel. Messages are routed through
- * exchanges when received but are not at that stage delivered
- * to the matching queues, rather the queues are held in an
- * instance of this class. On prepare() the message is marked
- * enqueued to the relevant queues in the MessagesStore. On
- * commit() the messages will be passed to the queue for
- * dispatch or to be added to the in-memory queue.
- */
-class QPID_BROKER_CLASS_EXTERN TxPublish : public TxOp, public Deliverable{
-
- class Commit{
- boost::intrusive_ptr<Message>& msg;
- public:
- Commit(boost::intrusive_ptr<Message>& msg);
- void operator()(const boost::shared_ptr<Queue>& queue);
- };
- class Rollback{
- boost::intrusive_ptr<Message>& msg;
- public:
- Rollback(boost::intrusive_ptr<Message>& msg);
- void operator()(const boost::shared_ptr<Queue>& queue);
- };
-
- boost::intrusive_ptr<Message> msg;
- std::list<boost::shared_ptr<Queue> > queues;
- std::list<boost::shared_ptr<Queue> > prepared;
-
- void prepare(TransactionContext* ctxt, boost::shared_ptr<Queue>);
-
- public:
- QPID_BROKER_EXTERN TxPublish(boost::intrusive_ptr<Message> msg);
- QPID_BROKER_EXTERN virtual bool prepare(TransactionContext* ctxt) throw();
- QPID_BROKER_EXTERN virtual void commit() throw();
- QPID_BROKER_EXTERN virtual void rollback() throw();
-
- virtual Message& getMessage() { return *msg; };
-
- QPID_BROKER_EXTERN virtual void deliverTo(const boost::shared_ptr<Queue>& queue);
-
- virtual ~TxPublish(){}
- virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); }
-
- QPID_BROKER_EXTERN uint64_t contentSize();
-
- boost::intrusive_ptr<Message> getMessage() const { return msg; }
- const std::list<boost::shared_ptr<Queue> >& getQueues() const { return queues; }
- const std::list<boost::shared_ptr<Queue> >& getPrepared() const { return prepared; }
-};
-}
-}
-
-
-#endif
Added: qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp?rev=1371676&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp Fri Aug 10 12:04:27 2012
@@ -0,0 +1,366 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "MessageTransfer.h"
+#include "qpid/broker/MapHandler.h"
+#include "qpid/broker/Message.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/MessageProperties.h"
+#include "qpid/framing/DeliveryProperties.h"
+#include "qpid/framing/FieldValue.h"
+#include "qpid/framing/frame_functors.h"
+#include "qpid/framing/TypeFilter.h"
+#include "qpid/framing/SendContent.h"
+#include "qpid/log/Statement.h"
+
+using namespace qpid::framing;
+
+namespace qpid {
+namespace broker {
+namespace amqp_0_10 {
+namespace {
+const std::string QMF2("qmf2");
+const std::string PARTIAL("partial");
+}
+MessageTransfer::MessageTransfer() : frames(framing::SequenceNumber()) {}
+MessageTransfer::MessageTransfer(const framing::SequenceNumber& id) : frames(id) {}
+
+uint64_t MessageTransfer::getContentSize() const
+{
+ return frames.getContentSize();
+}
+
+std::string MessageTransfer::getAnnotationAsString(const std::string& key) const
+{
+ const qpid::framing::MessageProperties* mp = getProperties<qpid::framing::MessageProperties>();
+ if (mp && mp->hasApplicationHeaders()) {
+ return mp->getApplicationHeaders().getAsString(key);
+ } else {
+ return std::string();
+ }
+}
+std::string MessageTransfer::getPropertyAsString(const std::string& key) const { return getAnnotationAsString(key); }
+
+bool MessageTransfer::getTtl(uint64_t& result) const
+{
+ const qpid::framing::DeliveryProperties* dp = getProperties<qpid::framing::DeliveryProperties>();
+ if (dp && dp->hasTtl()) {
+ result = dp->getTtl();
+ return true;
+ } else {
+ return false;
+ }
+}
+bool MessageTransfer::hasExpiration() const
+{
+ const qpid::framing::DeliveryProperties* dp = getProperties<qpid::framing::DeliveryProperties>();
+ if (dp && dp->hasExpiration()) {
+ return true;
+ } else {
+ return false;
+ }
+}
+
+uint8_t MessageTransfer::getPriority() const
+{
+ const qpid::framing::DeliveryProperties* dp = getProperties<qpid::framing::DeliveryProperties>();
+ if (dp && dp->hasPriority()) {
+ return dp->getPriority();
+ } else {
+ return 0;
+ }
+}
+
+std::string MessageTransfer::getExchangeName() const
+{
+ return getFrames().as<framing::MessageTransferBody>()->getDestination();
+}
+
+bool MessageTransfer::requiresAccept() const
+{
+ const framing::MessageTransferBody* b = getFrames().as<framing::MessageTransferBody>();
+ return b && b->getAcceptMode() == 0/*EXPLICIT == 0*/;
+}
+uint32_t MessageTransfer::getRequiredCredit() const
+{
+ return requiredCredit;
+}
+void MessageTransfer::computeRequiredCredit()
+{
+ //add up payload for all header and content frames in the frameset
+ qpid::framing::SumBodySize sum;
+ frames.map_if(sum, qpid::framing::TypeFilter2<qpid::framing::HEADER_BODY, qpid::framing::CONTENT_BODY>());
+ requiredCredit = sum.getSize();
+}
+uint32_t MessageTransfer::getRequiredCredit(const qpid::broker::Message& msg)
+{
+ //TODO: may need to reflect annotations and other modifications in this also
+ return get(msg).getRequiredCredit();
+}
+
+qpid::framing::FrameSet& MessageTransfer::getFrames()
+{
+ return frames;
+}
+const qpid::framing::FrameSet& MessageTransfer::getFrames() const
+{
+ return frames;
+}
+void MessageTransfer::sendContent(framing::FrameHandler& out, uint16_t maxFrameSize) const
+{
+ qpid::framing::Count c;
+ frames.map_if(c, qpid::framing::TypeFilter<qpid::framing::CONTENT_BODY>());
+
+ qpid::framing::SendContent f(out, maxFrameSize, c.getCount());
+ frames.map_if(f, qpid::framing::TypeFilter<qpid::framing::CONTENT_BODY>());
+}
+
+class SendHeader
+{
+ public:
+ SendHeader(FrameHandler& h, bool r, uint64_t t, uint64_t ts, const qpid::types::Variant::Map& a) : handler(h), redelivered(r), ttl(t), timestamp(ts), annotations(a) {}
+ void operator()(const AMQFrame& f)
+ {
+ AMQFrame copy = f;
+ if (redelivered || ttl || timestamp || annotations.size()) {
+ copy.cloneBody();
+ if (annotations.size()) {
+ MessageProperties* props =
+ copy.castBody<AMQHeaderBody>()->get<MessageProperties>(true);
+ for (qpid::types::Variant::Map::const_iterator i = annotations.begin();
+ i != annotations.end(); ++i) {
+ props->getApplicationHeaders().setString(i->first, i->second.asString());
+ }
+ }
+ if (redelivered || ttl || timestamp) {
+ DeliveryProperties* dp =
+ copy.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true);
+ if (ttl) dp->setTtl(ttl);
+ if (redelivered) dp->setRedelivered(redelivered);
+ if (timestamp) dp->setTimestamp(timestamp);
+ }
+ }
+ handler.handle(copy);
+ }
+ private:
+ FrameHandler& handler;
+ bool redelivered;
+ uint64_t ttl;
+ uint64_t timestamp;
+ const qpid::types::Variant::Map& annotations;
+};
+
+void MessageTransfer::sendHeader(framing::FrameHandler& out, uint16_t /*maxFrameSize*/,
+ bool redelivered, uint64_t ttl, uint64_t timestamp,
+ const qpid::types::Variant::Map& annotations) const
+{
+ SendHeader f(out, redelivered, ttl, timestamp, annotations);
+ frames.map_if(f, TypeFilter<HEADER_BODY>());
+}
+bool MessageTransfer::isImmediateDeliveryRequired(const qpid::broker::Message& /*message*/)
+{
+ return false;//TODO
+}
+
+const framing::SequenceNumber& MessageTransfer::getCommandId() const { return frames.getId(); }
+
+std::string MessageTransfer::getRoutingKey() const
+{
+ const qpid::framing::DeliveryProperties* dp = getProperties<qpid::framing::DeliveryProperties>();
+ if (dp && dp->hasRoutingKey()) {
+ return dp->getRoutingKey();
+ } else {
+ return std::string();
+ }
+}
+bool MessageTransfer::isPersistent() const
+{
+ const qpid::framing::DeliveryProperties* dp = getProperties<qpid::framing::DeliveryProperties>();
+ if (dp && dp->hasDeliveryMode()) {
+ return dp->getDeliveryMode() == 2;
+ } else {
+ return false;
+ }
+}
+
+std::string MessageTransfer::getContent() const
+{
+ return frames.getContent();
+}
+
+void MessageTransfer::decodeHeader(framing::Buffer& buffer)
+{
+ AMQFrame method;
+ method.decode(buffer);
+ frames.append(method);
+
+ AMQFrame header;
+ header.decode(buffer);
+ frames.append(header);
+}
+void MessageTransfer::decodeContent(framing::Buffer& buffer)
+{
+ if (buffer.available()) {
+ //get the data as a string and set that as the content
+ //body on a frame then add that frame to the frameset
+ AMQFrame frame((AMQContentBody()));
+ frame.castBody<AMQContentBody>()->decode(buffer, buffer.available());
+ frame.setFirstSegment(false);
+ frames.append(frame);
+ } else {
+ //adjust header flags
+ MarkLastSegment f;
+ frames.map_if(f, TypeFilter<HEADER_BODY>());
+ }
+}
+
+void MessageTransfer::encode(framing::Buffer& buffer) const
+{
+ //encode method and header frames
+ EncodeFrame f1(buffer);
+ frames.map_if(f1, TypeFilter2<METHOD_BODY, HEADER_BODY>());
+
+ //then encode the payload of each content frame
+ framing::EncodeBody f2(buffer);
+ frames.map_if(f2, TypeFilter<CONTENT_BODY>());
+}
+
+void MessageTransfer::encodeContent(framing::Buffer& buffer) const
+{
+ //encode the payload of each content frame
+ EncodeBody f2(buffer);
+ frames.map_if(f2, TypeFilter<CONTENT_BODY>());
+}
+
+uint32_t MessageTransfer::encodedSize() const
+{
+ return encodedHeaderSize() + encodedContentSize();
+}
+
+uint32_t MessageTransfer::encodedContentSize() const
+{
+ return frames.getContentSize();
+}
+
+uint32_t MessageTransfer::encodedHeaderSize() const
+{
+ //add up the size for all method and header frames in the frameset
+ SumFrameSize sum;
+ frames.map_if(sum, TypeFilter2<METHOD_BODY, HEADER_BODY>());
+ return sum.getSize();
+}
+
+bool MessageTransfer::isQMFv2() const
+{
+ const framing::MessageProperties* props = getProperties<framing::MessageProperties>();
+ return props && props->getAppId() == QMF2 && props->hasApplicationHeaders();
+}
+
+bool MessageTransfer::isQMFv2(const qpid::broker::Message& message)
+{
+ const MessageTransfer* transfer = dynamic_cast<const MessageTransfer*>(&message.getEncoding());
+ return transfer && transfer->isQMFv2();
+}
+
+bool MessageTransfer::isLastQMFResponse(const std::string correlation) const
+{
+ const framing::MessageProperties* props = getProperties<framing::MessageProperties>();
+ return props && props->getCorrelationId() == correlation
+ && props->hasApplicationHeaders() && !props->getApplicationHeaders().isSet(PARTIAL);
+}
+
+bool MessageTransfer::isLastQMFResponse(const qpid::broker::Message& message, const std::string correlation)
+{
+ const MessageTransfer* transfer = dynamic_cast<const MessageTransfer*>(&message.getEncoding());
+ return transfer && transfer->isLastQMFResponse(correlation);
+}
+
+
+void MessageTransfer::processProperties(qpid::broker::MapHandler& handler) const
+{
+ const qpid::framing::MessageProperties* mp = getProperties<qpid::framing::MessageProperties>();
+ if (mp && mp->hasApplicationHeaders()) {
+ const FieldTable ft = mp->getApplicationHeaders();
+ for (FieldTable::const_iterator i = ft.begin(); i != ft.end(); ++i) {
+ qpid::broker::MapHandler::CharSequence key;
+ key.data = i->first.data();
+ key.size = i->first.size();
+ FieldTable::ValuePtr v = i->second;
+ //TODO: something more sophisticated...
+ if (v->empty()) {
+ handler.handleVoid(key);
+ } else if (v->convertsTo<uint64_t>()) {
+ handler.handleUint64(key, v->get<uint64_t>());
+ } else if (v->convertsTo<int64_t>()) {
+ handler.handleInt64(key, v->get<int64_t>());
+ } else if (v->convertsTo<std::string>()) {
+ std::string s = v->get<std::string>();
+ qpid::broker::MapHandler::CharSequence value;
+ value.data = s.data();
+ value.size = s.size();
+ qpid::broker::MapHandler::CharSequence encoding; encoding.size = 0; encoding.data = 0;
+ handler.handleString(key, value, encoding);
+ } else {
+ QPID_LOG(debug, "Unhandled key!" << *v);
+ }
+ }
+ }
+}
+
+std::string MessageTransfer::getUserId() const
+{
+ const qpid::framing::MessageProperties* mp = getProperties<qpid::framing::MessageProperties>();
+ if (mp && mp->hasUserId()) return mp->getUserId();
+ else return std::string();
+
+}
+MessageTransfer::MessageTransfer(const qpid::framing::FrameSet& f) : frames(f), requiredCredit(0) {}
+
+boost::intrusive_ptr<PersistableMessage> MessageTransfer::merge(const std::map<std::string, qpid::types::Variant>& annotations) const
+{
+ boost::intrusive_ptr<MessageTransfer> clone(new MessageTransfer(this->frames));
+ qpid::framing::MessageProperties* mp = clone->frames.getHeaders()->get<qpid::framing::MessageProperties>(true);
+ for (qpid::types::Variant::Map::const_iterator i = annotations.begin(); i != annotations.end(); ++i) {
+ mp->getApplicationHeaders().setString(i->first, i->second);
+ }
+ return clone;
+}
+}
+// qpid::broker namespace, TODO: move these elsewhere!
+void encode(const Message& in, std::string& out)
+{
+ const amqp_0_10::MessageTransfer& transfer = amqp_0_10::MessageTransfer::get(in);
+ uint32_t size = transfer.encodedSize();
+ std::vector<char> data(size);
+ qpid::framing::Buffer buffer(&(data[0]), size);
+ transfer.encode(buffer);
+ buffer.reset();
+ buffer.getRawData(out, size);
+}
+void decode(const std::string& in, Message& out)
+{
+ boost::intrusive_ptr<amqp_0_10::MessageTransfer> transfer(new amqp_0_10::MessageTransfer);
+ qpid::framing::Buffer buffer(const_cast<char*>(in.data()), in.size());
+ transfer->decodeHeader(buffer);
+ transfer->decodeContent(buffer);
+ out = Message(transfer, transfer);
+}
+
+}} // namespace qpid::broker::amqp_0_10
Added: qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h?rev=1371676&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h Fri Aug 10 12:04:27 2012
@@ -0,0 +1,132 @@
+#ifndef QPID_BROKER_AMQP_0_10_MESSAGETRANSFER_H
+#define QPID_BROKER_AMQP_0_10_MESSAGETRANSFER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/BrokerImportExport.h"
+#include "qpid/framing/FrameSet.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/PersistableMessage.h"
+#include "qpid/types/Variant.h"
+
+namespace qpid {
+namespace broker {
+class Queue;
+namespace amqp_0_10 {
+
+/**
+ *
+ */
+class MessageTransfer : public qpid::broker::Message::Encoding, public qpid::broker::PersistableMessage
+{
+ public:
+ QPID_BROKER_EXTERN MessageTransfer();
+ QPID_BROKER_EXTERN MessageTransfer(const qpid::framing::SequenceNumber&);
+
+ std::string getRoutingKey() const;
+ bool isPersistent() const;
+ uint8_t getPriority() const;
+ uint64_t getContentSize() const;
+ std::string getPropertyAsString(const std::string& key) const;
+ std::string getAnnotationAsString(const std::string& key) const;
+ bool getTtl(uint64_t&) const;
+ bool hasExpiration() const;
+ std::string getExchangeName() const;
+ void processProperties(MapHandler&) const;
+ std::string getUserId() const;
+
+ bool requiresAccept() const;
+ const qpid::framing::SequenceNumber& getCommandId() const;
+ QPID_BROKER_EXTERN qpid::framing::FrameSet& getFrames();
+ QPID_BROKER_EXTERN const qpid::framing::FrameSet& getFrames() const;
+
+ template <class T> const T* getProperties() const {
+ const qpid::framing::AMQHeaderBody* p = frames.getHeaders();
+ return p->get<T>();
+ }
+
+ template <class T> const T* hasProperties() const {
+ const qpid::framing::AMQHeaderBody* p = frames.getHeaders();
+ return p->get<T>();
+ }
+ template <class T> const T* getMethod() const {
+ return frames.as<T>();
+ }
+
+ template <class T> T* getMethod() {
+ return frames.as<T>();
+ }
+
+ template <class T> bool isA() const {
+ return frames.isA<T>();
+ }
+
+ template <class T> void eraseProperties() {
+ qpid::framing::AMQHeaderBody* p = frames.getHeaders();
+ p->erase<T>();
+ }
+ std::string getContent() const;
+ uint32_t getRequiredCredit() const;
+ void computeRequiredCredit();
+
+ void clearApplicationHeadersFlag();
+ void sendContent(framing::FrameHandler& out, uint16_t maxFrameSize) const;
+ void sendHeader(framing::FrameHandler& out, uint16_t maxFrameSize, bool redelivered, uint64_t ttl, uint64_t timestamp, const qpid::types::Variant::Map& annotations) const;
+
+ void decodeHeader(framing::Buffer& buffer);
+ void decodeContent(framing::Buffer& buffer);
+
+ void encode(framing::Buffer& buffer) const;
+ uint32_t encodedSize() const;
+
+ /**
+ * @returns the size of the buffer needed to encode the
+ * 'header' of this message (not just the header frame,
+ * but other meta data e.g.routing key and exchange)
+ */
+ uint32_t encodedHeaderSize() const;
+ boost::intrusive_ptr<PersistableMessage> merge(const std::map<std::string, qpid::types::Variant>& annotations) const;
+
+ QPID_BROKER_EXTERN bool isQMFv2() const;
+ QPID_BROKER_EXTERN bool isLastQMFResponse(const std::string correlation) const;
+
+ static bool isImmediateDeliveryRequired(const qpid::broker::Message& message);
+ static uint32_t getRequiredCredit(const qpid::broker::Message&);
+ static MessageTransfer& get(qpid::broker::Message& message) {
+ return *dynamic_cast<MessageTransfer*>(&message.getEncoding());
+ }
+ static const MessageTransfer& get(const qpid::broker::Message& message) {
+ return *dynamic_cast<const MessageTransfer*>(&message.getEncoding());
+ }
+ QPID_BROKER_EXTERN static bool isQMFv2(const qpid::broker::Message& message);
+ QPID_BROKER_EXTERN static bool isLastQMFResponse(const qpid::broker::Message& message, const std::string correlation);
+ private:
+ qpid::framing::FrameSet frames;
+ uint32_t requiredCredit;
+
+ MessageTransfer(const qpid::framing::FrameSet&);
+ void encodeHeader(framing::Buffer& buffer) const;
+ uint32_t encodedContentSize() const;
+ void encodeContent(framing::Buffer& buffer) const;
+};
+}}} // namespace qpid::broker::amqp_0_10
+
+#endif /*!QPID_BROKER_AMQP_0_10_MESSAGETRANSFER_H*/
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp Fri Aug 10 12:04:27 2012
@@ -35,6 +35,7 @@
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/sys/SystemInfo.h"
#include "qpid/types/Variant.h"
+#include "qpid/log/Statement.h"
namespace qpid {
namespace ha {
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Fri Aug 10 12:04:27 2012
@@ -24,7 +24,9 @@
#include "qpid/broker/Broker.h"
#include "qpid/broker/Connection.h"
#include "qpid/broker/Queue.h"
+#include "qpid/broker/QueueSettings.h"
#include "qpid/broker/Link.h"
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/log/Statement.h"
#include "qpid/amqp_0_10/Codecs.h"
@@ -117,11 +119,6 @@ const string _QUERY_REQUEST("_query_requ
const string BROKER("broker");
const string MEMBERS("members");
-bool isQMFv2(const Message& message) {
- const framing::MessageProperties* props = message.getProperties<framing::MessageProperties>();
- return props && props->getAppId() == QMF2;
-}
-
template <class T> bool match(Variant::Map& schema) {
return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]);
}
@@ -253,18 +250,15 @@ void BrokerReplicator::route(Deliverable
haBroker.setStatus(CATCHUP);
QPID_LOG(notice, logPrefix << "Connected to primary " << primary);
}
-
- const framing::FieldTable* headers = msg.getMessage().getApplicationHeaders();
- const MessageProperties* messageProperties = msg.getMessage().getProperties<MessageProperties>();
Variant::List list;
try {
- if (!isQMFv2(msg.getMessage()) || !headers || !messageProperties)
+ if (!qpid::broker::amqp_0_10::MessageTransfer::isQMFv2(msg.getMessage()))
throw Exception("Unexpected message, not QMF2 event or query response.");
// decode as list
- string content = msg.getMessage().getFrames().getContent();
- amqp_0_10::ListCodec::decode(content, list);
- QPID_LOG(trace, "Broker replicator received: " << *messageProperties);
- if (headers->getAsString(QMF_CONTENT) == EVENT) {
+ string content = msg.getMessage().getContent();
+ qpid::amqp_0_10::ListCodec::decode(content, list);
+
+ if (msg.getMessage().getPropertyAsString(QMF_CONTENT) == EVENT) {
for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
Variant::Map& map = i->asMap();
QPID_LOG(trace, "Broker replicator event: " << map);
@@ -278,20 +272,20 @@ void BrokerReplicator::route(Deliverable
else if (match<EventUnbind>(schema)) doEventUnbind(values);
else if (match<EventMembersUpdate>(schema)) doEventMembersUpdate(values);
}
- } else if (headers->getAsString(QMF_OPCODE) == QUERY_RESPONSE) {
+ } else if (msg.getMessage().getPropertyAsString(QMF_OPCODE) == QUERY_RESPONSE) {
for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
Variant::Map& map = i->asMap();
QPID_LOG(trace, "Broker replicator response: " << map);
string type = map[SCHEMA_ID].asMap()[CLASS_NAME].asString();
Variant::Map& values = map[VALUES].asMap();
framing::FieldTable args;
- amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
+ qpid::amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
if (type == QUEUE) doResponseQueue(values);
else if (type == EXCHANGE) doResponseExchange(values);
else if (type == BINDING) doResponseBind(values);
else if (type == HA_BROKER) doResponseHaBroker(values);
}
- if (messageProperties->getCorrelationId() == EXCHANGE && !headers->isSet(PARTIAL)) {
+ if (qpid::broker::amqp_0_10::MessageTransfer::isLastQMFResponse(msg.getMessage(), EXCHANGE)) {
// We have received all of the exchange response.
alternates.clear();
}
@@ -309,13 +303,11 @@ void BrokerReplicator::doEventQueueDecla
Variant::Map argsMap = asMapVoid(values[ARGS]);
bool autoDel = values[AUTODEL].asBool();
bool excl = values[EXCL].asBool();
- if (values[DISP] == CREATED &&
- replicationTest.isReplicated(CONFIGURATION, argsMap, autoDel, excl))
- {
+ if (values[DISP] == CREATED && replicationTest.isReplicated(CONFIGURATION, argsMap, autoDel, excl)) {
string name = values[QNAME].asString();
- QPID_LOG(debug, logPrefix << "Queue declare event: " << name);
+ QueueSettings settings(values[DURABLE].asBool(), values[AUTODEL].asBool());
framing::FieldTable args;
- amqp_0_10::translate(argsMap, args);
+ qpid::amqp_0_10::translate(argsMap, args);
// If we already have a queue with this name, replace it.
// The queue was definitely created on the primary.
if (broker.getQueues().find(name)) {
@@ -323,10 +315,17 @@ void BrokerReplicator::doEventQueueDecla
broker.getQueues().destroy(name);
stopQueueReplicator(name);
}
- boost::shared_ptr<Queue> queue = createQueue(
- name, values[DURABLE].asBool(), autoDel, args, values[ALTEX].asString());
- assert(queue); // Should be created since we destroed the previous queue above.
- if (queue) startQueueReplicator(queue);
+ settings.populate(args, settings.storeSettings);
+ std::pair<boost::shared_ptr<Queue>, bool> result =
+ broker.createQueue(
+ name,
+ settings,
+ 0 /*i.e. no owner regardless of exclusivity on master*/,
+ values[ALTEX].asString(),
+ userId,
+ remoteHost);
+ assert(result.second); // Should be true since we destroyed existing queue above
+ startQueueReplicator(result.first);
}
}
@@ -343,7 +342,7 @@ void BrokerReplicator::doEventQueueDelet
// sessions may be closed by a "queue deleted" exception.
string name = values[QNAME].asString();
boost::shared_ptr<Queue> queue = broker.getQueues().find(name);
- if (queue && replicationTest.replicateLevel(queue->getSettings())) {
+ if (queue && replicationTest.replicateLevel(queue->getSettings().storeSettings)) {
QPID_LOG(debug, logPrefix << "Queue delete event: " << name);
stopQueueReplicator(name);
broker.deleteQueue(name, userId, remoteHost);
@@ -357,7 +356,7 @@ void BrokerReplicator::doEventExchangeDe
string name = values[EXNAME].asString();
QPID_LOG(debug, logPrefix << "Exchange declare event: " << name);
framing::FieldTable args;
- amqp_0_10::translate(argsMap, args);
+ qpid::amqp_0_10::translate(argsMap, args);
// If we already have a exchange with this name, replace it.
// The exchange was definitely created on the primary.
if (broker.getExchanges().find(name)) {
@@ -391,10 +390,10 @@ void BrokerReplicator::doEventBind(Varia
// We only replicate binds for a replicated queue to replicated
// exchange that both exist locally.
if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
- queue && replicationTest.replicateLevel(queue->getSettings()))
+ queue && replicationTest.replicateLevel(queue->getSettings().storeSettings))
{
framing::FieldTable args;
- amqp_0_10::translate(asMapVoid(values[ARGS]), args);
+ qpid::amqp_0_10::translate(asMapVoid(values[ARGS]), args);
string key = values[KEY].asString();
QPID_LOG(debug, logPrefix << "Bind event: exchange=" << exchange->getName()
<< " queue=" << queue->getName()
@@ -411,10 +410,10 @@ void BrokerReplicator::doEventUnbind(Var
// We only replicate unbinds for a replicated queue to replicated
// exchange that both exist locally.
if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
- queue && replicationTest.replicateLevel(queue->getSettings()))
+ queue && replicationTest.replicateLevel(queue->getSettings().storeSettings))
{
framing::FieldTable args;
- amqp_0_10::translate(asMapVoid(values[ARGS]), args);
+ qpid::amqp_0_10::translate(asMapVoid(values[ARGS]), args);
string key = values[KEY].asString();
QPID_LOG(debug, logPrefix << "Unbind event: exchange=" << exchange->getName()
<< " queue=" << queue->getName()
@@ -455,7 +454,7 @@ void BrokerReplicator::doResponseQueue(V
string name(values[NAME].asString());
QPID_LOG(debug, logPrefix << "Queue response: " << name);
framing::FieldTable args;
- amqp_0_10::translate(argsMap, args);
+ qpid::amqp_0_10::translate(argsMap, args);
boost::shared_ptr<Queue> queue =
createQueue(name, values[DURABLE].asBool(), values[AUTODELETE].asBool(), args,
getAltExchange(values[ALTEXCHANGE]));
@@ -470,7 +469,7 @@ void BrokerReplicator::doResponseExchang
string name = values[NAME].asString();
QPID_LOG(debug, logPrefix << "Exchange response: " << name);
framing::FieldTable args;
- amqp_0_10::translate(argsMap, args);
+ qpid::amqp_0_10::translate(argsMap, args);
boost::shared_ptr<Exchange> exchange = createExchange(
name, values[TYPE].asString(), values[DURABLE].asBool(), args,
getAltExchange(values[ALTEXCHANGE]));
@@ -507,14 +506,14 @@ void BrokerReplicator::doResponseBind(Va
// Automatically replicate binding if queue and exchange exist and are replicated
if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
- queue && replicationTest.replicateLevel(queue->getSettings()))
+ queue && replicationTest.replicateLevel(queue->getSettings().storeSettings))
{
string key = values[KEY].asString();
QPID_LOG(debug, logPrefix << "Bind response: exchange:" << exName
<< " queue:" << qName
<< " key:" << key);
framing::FieldTable args;
- amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
+ qpid::amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
exchange->bind(queue, key, &args);
}
}
@@ -544,7 +543,7 @@ void BrokerReplicator::doResponseHaBroke
void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue)
{
- if (replicationTest.replicateLevel(queue->getSettings()) == ALL) {
+ if (replicationTest.replicateLevel(queue->getSettings().storeSettings) == ALL) {
boost::shared_ptr<QueueReplicator> qr(
new QueueReplicator(haBroker, queue, link));
if (!broker.getExchanges().registerExchange(qr))
@@ -570,14 +569,14 @@ boost::shared_ptr<Queue> BrokerReplicato
const qpid::framing::FieldTable& arguments,
const std::string& alternateExchange)
{
+ QueueSettings settings(durable, autodelete);
+ settings.populate(arguments, settings.storeSettings);
std::pair<boost::shared_ptr<Queue>, bool> result =
broker.createQueue(
name,
- durable,
- autodelete,
- 0, // no owner regardless of exclusivity on primary
+ settings,
+ 0,// no owner regardless of exclusivity on primary
string(), // Set alternate exchange below
- arguments,
userId,
remoteHost);
if (result.second) {
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp Fri Aug 10 12:04:27 2012
@@ -180,7 +180,7 @@ void Primary::readyReplica(const Replica
void Primary::queueCreate(const QueuePtr& q) {
// Throw if there is an invalid replication level in the queue settings.
- haBroker.getReplicationTest().replicateLevel(q->getSettings());
+ haBroker.getReplicationTest().replicateLevel(q->getSettings().storeSettings);
Mutex::ScopedLock l(lock);
for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i) {
i->second->queueCreate(q);
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp Fri Aug 10 12:04:27 2012
@@ -39,10 +39,10 @@ class QueueGuard::QueueObserver : public
{
public:
QueueObserver(QueueGuard& g) : guard(g) {}
- void enqueued(const broker::QueuedMessage& qm) { guard.enqueued(qm); }
- void dequeued(const broker::QueuedMessage& qm) { guard.dequeued(qm); }
- void acquired(const broker::QueuedMessage&) {}
- void requeued(const broker::QueuedMessage&) {}
+ void enqueued(const broker::Message& m) { guard.enqueued(m); }
+ void dequeued(const broker::Message& m) { guard.dequeued(m); }
+ void acquired(const broker::Message&) {}
+ void requeued(const broker::Message&) {}
private:
QueueGuard& guard;
};
@@ -64,39 +64,47 @@ QueueGuard::QueueGuard(broker::Queue& q,
QueueGuard::~QueueGuard() { cancel(); }
// NOTE: Called with message lock held.
-void QueueGuard::enqueued(const QueuedMessage& qm) {
- assert(qm.queue == &queue);
+void QueueGuard::enqueued(const Message& m) {
// Delay completion
- QPID_LOG(trace, logPrefix << "Delayed completion of " << qm);
- qm.payload->getIngressCompletion().startCompleter();
+ QPID_LOG(trace, logPrefix << "Delayed completion of " << m);
+ m.getIngressCompletion()->startCompleter();
{
Mutex::ScopedLock l(lock);
- assert(!delayed.contains(qm.position));
- delayed += qm.position;
+ if (!delayed.insert(Delayed::value_type(m.getSequence(), m.getIngressCompletion())).second) {
+ QPID_LOG(critical, logPrefix << "Second enqueue for message with sequence " << m.getSequence());
+ assert(false);
+ }
}
}
// NOTE: Called with message lock held.
-void QueueGuard::dequeued(const QueuedMessage& qm) {
- assert(qm.queue == &queue);
- QPID_LOG(trace, logPrefix << "Dequeued " << qm);
+void QueueGuard::dequeued(const Message& m) {
+ QPID_LOG(trace, logPrefix << "Dequeued " << m);
ReplicatingSubscription* rs=0;
{
Mutex::ScopedLock l(lock);
rs = subscription;
}
- if (rs) rs->dequeued(qm);
- complete(qm);
+ if (rs) rs->dequeued(m);
+ complete(m.getSequence());
+}
+
+void QueueGuard::completeRange(Delayed::iterator begin, Delayed::iterator end) {
+ for (Delayed::iterator i = begin; i != end; ++i) {
+ QPID_LOG(trace, logPrefix << "Completed " << i->first);
+ i->second->finishCompleter();
+ }
}
void QueueGuard::cancel() {
queue.removeObserver(observer);
+ Delayed removed;
{
Mutex::ScopedLock l(lock);
if (delayed.empty()) return; // No need if no delayed messages.
+ delayed.swap(removed);
}
- // FIXME aconway 2012-06-15: optimize, only messages in delayed set.
- queue.eachMessage(boost::bind(&QueueGuard::complete, this, _1));
+ completeRange(removed.begin(), removed.end());
}
void QueueGuard::attach(ReplicatingSubscription& rs) {
@@ -104,36 +112,39 @@ void QueueGuard::attach(ReplicatingSubsc
subscription = &rs;
}
-namespace {
-void completeBefore(QueueGuard* guard, SequenceNumber position, const QueuedMessage& qm) {
- if (qm.position <= position) guard->complete(qm);
-}
-}
-
bool QueueGuard::subscriptionStart(SequenceNumber position) {
- // Complete any messages before or at the ReplicatingSubscription start position.
- // Those messages are already on the backup.
- if (!delayed.empty() && delayed.front() <= position) {
- // FIXME aconway 2012-06-15: queue iteration, only messages in delayed
- queue.eachMessage(boost::bind(&completeBefore, this, position, _1));
+ Delayed removed;
+ {
+ Mutex::ScopedLock l(lock);
+ // Complete any messages before or at the ReplicatingSubscription start position.
+ // Those messages are already on the backup.
+ for (Delayed::iterator i = delayed.begin(); i != delayed.end() && i->first <= position;) {
+ removed.insert(*i);
+ delayed.erase(i++);
+ }
}
+ completeRange(removed.begin(), removed.end());
return position >= range.back;
}
-void QueueGuard::complete(const QueuedMessage& qm) {
- assert(qm.queue == &queue);
+void QueueGuard::complete(SequenceNumber sequence) {
+ boost::intrusive_ptr<broker::AsyncCompletion> m;
{
Mutex::ScopedLock l(lock);
// The same message can be completed twice, by
// ReplicatingSubscription::acknowledged and dequeued. Remove it
- // from the set so we only call finishCompleter() once
- if (delayed.contains(qm.position))
- delayed -= qm.position;
- else
- return;
+ // from the map so we only call finishCompleter() once
+ Delayed::iterator i = delayed.find(sequence);
+ if (i != delayed.end()) {
+ m = i->second;
+ delayed.erase(i);
+ }
+
+ }
+ if (m) {
+ QPID_LOG(trace, logPrefix << "Completed " << sequence);
+ m->finishCompleter();
}
- QPID_LOG(trace, logPrefix << "Completed " << qm);
- qm.payload->getIngressCompletion().finishCompleter();
}
}} // namespaces qpid::ha
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.h Fri Aug 10 12:04:27 2012
@@ -63,15 +63,15 @@ class QueueGuard {
/** QueueObserver override. Delay completion of the message.
* NOTE: Called under the queues message lock.
*/
- void enqueued(const broker::QueuedMessage&);
+ void enqueued(const broker::Message&);
/** QueueObserver override: Complete a delayed message.
* NOTE: Called under the queues message lock.
*/
- void dequeued(const broker::QueuedMessage&);
+ void dequeued(const broker::Message&);
/** Complete a delayed message. */
- void complete(const broker::QueuedMessage&);
+ void complete(framing::SequenceNumber);
/** Complete all delayed messages. */
void cancel();
@@ -108,10 +108,13 @@ class QueueGuard {
sys::Mutex lock;
std::string logPrefix;
broker::Queue& queue;
- framing::SequenceSet delayed;
+ typedef std::map<framing::SequenceNumber, boost::intrusive_ptr<broker::AsyncCompletion> > Delayed;
+ Delayed delayed;
ReplicatingSubscription* subscription;
boost::shared_ptr<QueueObserver> observer;
QueueRange range;
+
+ void completeRange(Delayed::iterator begin, Delayed::iterator end);
};
}} // namespace qpid::ha
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Fri Aug 10 12:04:27 2012
@@ -120,8 +120,10 @@ void QueueReplicator::initializeBridge(B
settings.setTable(ReplicatingSubscription::QPID_BROKER_INFO,
brokerInfo.asFieldTable());
SequenceNumber front;
- if (ReplicatingSubscription::getFront(*queue, front))
+ if (ReplicatingSubscription::getFront(*queue, front)) {
settings.setInt(ReplicatingSubscription::QPID_FRONT, front);
+ QPID_LOG(debug, "QPID_FRONT for " << queue->getName() << " is " << front);
+ }
peer.getMessage().subscribe(
args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/,
false/*exclusive*/, "", 0, settings);
@@ -137,8 +139,7 @@ void QueueReplicator::initializeBridge(B
namespace {
template <class T> T decodeContent(Message& m) {
- std::string content;
- m.getFrames().getContent(content);
+ std::string content = m.getContent();
Buffer buffer(const_cast<char*>(content.c_str()), content.size());
T result;
result.decode(buffer);
@@ -148,9 +149,7 @@ template <class T> T decodeContent(Messa
void QueueReplicator::dequeue(SequenceNumber n, sys::Mutex::ScopedLock&) {
// Thread safe: only calls thread safe Queue functions.
- QueuedMessage message;
- if (queue->acquireMessageAt(n, message))
- queue->dequeue(0, message);
+ queue->dequeueMessageAt(n);
}
// Called in connection thread of the queues bridge to primary.
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp Fri Aug 10 12:04:27 2012
@@ -23,6 +23,7 @@
#include "qpid/broker/Broker.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueRegistry.h"
+#include "qpid/log/Statement.h"
#include <boost/bind.hpp>
namespace qpid {
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp Fri Aug 10 12:04:27 2012
@@ -27,6 +27,7 @@
#include "qpid/broker/Queue.h"
#include "qpid/broker/SessionContext.h"
#include "qpid/broker/ConnectionState.h"
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/log/Statement.h"
@@ -66,10 +67,10 @@ class DequeueScanner
at = front - 1;
}
- void operator()(const QueuedMessage& qm) {
- if (qm.position >= front && qm.position <= back) {
- if (qm.position > at+1) subscription.dequeued(at+1, qm.position-1);
- at = qm.position;
+ void operator()(const Message& m) {
+ if (m.getSequence() >= front && m.getSequence() <= back) {
+ if (m.getSequence() > at+1) subscription.dequeued(at+1, m.getSequence()-1);
+ at = m.getSequence();
}
}
@@ -90,37 +91,23 @@ string mask(const string& in)
return DOLLAR + in + INTERNAL;
}
-
-/** Dummy consumer used to get the front position on the queue */
-class GetPositionConsumer : public Consumer
+namespace {
+bool getSequence(const Message& message, SequenceNumber& result)
{
- public:
- GetPositionConsumer() :
- Consumer("ha.GetPositionConsumer."+types::Uuid(true).str(), false) {}
- bool deliver(broker::QueuedMessage& ) { return true; }
- void notify() {}
- bool filter(boost::intrusive_ptr<broker::Message>) { return true; }
- bool accept(boost::intrusive_ptr<broker::Message>) { return true; }
- void cancel() {}
- void acknowledged(const broker::QueuedMessage&) {}
- bool browseAcquired() const { return true; }
- broker::OwnershipToken* getSession() { return 0; }
-};
-
-
+ result = message.getSequence();
+ return true;
+}
+}
bool ReplicatingSubscription::getNext(
broker::Queue& q, SequenceNumber from, SequenceNumber& result)
{
- boost::shared_ptr<Consumer> c(new GetPositionConsumer);
- c->setPosition(from);
- if (!q.dispatch(c)) return false;
- result = c->getPosition();
- return true;
+ QueueCursor cursor(REPLICATOR);
+ return q.seek(cursor, boost::bind(&getSequence, _1, boost::ref(result)), from);
}
bool ReplicatingSubscription::getFront(broker::Queue& q, SequenceNumber& front) {
- // FIXME aconway 2012-05-23: won't wrap, assumes 0 is < all messages in queue.
- return getNext(q, 0, front);
+ QueueCursor cursor(REPLICATOR);
+ return q.seek(cursor, boost::bind(&getSequence, _1, boost::ref(front)));
}
/* Called by SemanticState::consume to create a consumer */
@@ -152,15 +139,14 @@ ReplicatingSubscription::ReplicatingSubs
const string& name,
Queue::shared_ptr queue,
bool ack,
- bool acquire,
+ bool /*acquire*/,
bool exclusive,
const string& tag,
const string& resumeId,
uint64_t resumeTtl,
const framing::FieldTable& arguments
-) : ConsumerImpl(parent, name, queue, ack, acquire, exclusive, tag,
+) : ConsumerImpl(parent, name, queue, ack, REPLICATOR, exclusive, tag,
resumeId, resumeTtl, arguments),
- dummy(new Queue(mask(name))),
ready(false)
{
try {
@@ -213,6 +199,8 @@ ReplicatingSubscription::ReplicatingSubs
queue->eachMessage(boost::ref(scan)); // Remove missing messages in between.
scan.finish();
position = backup.back;
+ //move cursor to position
+ queue->seek(*this, position);
}
// NOTE: we are assuming that the messages that are on the backup are
// consistent with those on the primary. If the backup is a replica
@@ -260,32 +248,31 @@ void ReplicatingSubscription::initialize
}
// Message is delivered in the subscription's connection thread.
-bool ReplicatingSubscription::deliver(QueuedMessage& qm) {
+bool ReplicatingSubscription::deliver(const qpid::broker::QueueCursor& c, const qpid::broker::Message& m) {
+ position = m.getSequence();
try {
- // Add position events for the subscribed queue, not the internal event queue.
- if (qm.queue == getQueue().get()) {
- QPID_LOG(trace, logPrefix << "Replicating " << qm);
- {
- Mutex::ScopedLock l(lock);
- assert(position == qm.position);
- // qm.position is the position of the newly enqueued qm on local queue.
- // backupPosition is latest position on backup queue before enqueueing
- if (qm.position <= backupPosition)
- throw Exception(
- QPID_MSG("Expected position > " << backupPosition
- << " but got " << qm.position));
- if (qm.position - backupPosition > 1) {
- // Position has advanced because of messages dequeued ahead of us.
- // Send the position before qm was enqueued.
- sendPositionEvent(qm.position-1, l);
- }
- // Backup will automatically advance by 1 on delivery of message.
- backupPosition = qm.position;
+ QPID_LOG(trace, logPrefix << "Replicating " << getQueue()->getName() << "[" << m.getSequence() << "]");
+ {
+ Mutex::ScopedLock l(lock);
+ //FIXME GRS: position is no longer set//assert(position == m.getSequence());
+
+ // m.getSequence() is the position of the newly enqueued message on local queue.
+ // backupPosition is latest position on backup queue before enqueueing
+ if (m.getSequence() <= backupPosition)
+ throw Exception(
+ QPID_MSG("Expected position > " << backupPosition
+ << " but got " << m.getSequence()));
+ if (m.getSequence() - backupPosition > 1) {
+ // Position has advanced because of messages dequeued ahead of us.
+ // Send the position before message was enqueued.
+ sendPositionEvent(m.getSequence()-1, l);
}
+ // Backup will automatically advance by 1 on delivery of message.
+ backupPosition = m.getSequence();
}
- return ConsumerImpl::deliver(qm);
+ return ConsumerImpl::deliver(c, m);
} catch (const std::exception& e) {
- QPID_LOG(critical, logPrefix << "Error replicating " << qm
+ QPID_LOG(critical, logPrefix << "Error replicating " << getQueue()->getName() << "[" << m.getSequence() << "]"
<< ": " << e.what());
throw;
}
@@ -310,15 +297,13 @@ void ReplicatingSubscription::cancel()
}
// Consumer override, called on primary in the backup's IO thread.
-void ReplicatingSubscription::acknowledged(const QueuedMessage& qm) {
- if (qm.queue == getQueue().get()) { // Don't complete messages on the internal queue
- // Finish completion of message, it has been acknowledged by the backup.
- QPID_LOG(trace, logPrefix << "Acknowledged " << qm);
- guard->complete(qm);
- // If next message is protected by the guard then we are ready
- if (qm.position >= guard->getRange().back) setReady();
- }
- ConsumerImpl::acknowledged(qm);
+void ReplicatingSubscription::acknowledged(const broker::DeliveryRecord& r) {
+ // Finish completion of message, it has been acknowledged by the backup.
+ QPID_LOG(trace, logPrefix << "Acknowledged " << getQueue()->getName() << "[" << r.getMessageId() << "]");
+ guard->complete(r.getMessageId());
+ // If next message is protected by the guard then we are ready
+ if (r.getMessageId() >= guard->getRange().back) setReady();
+ ConsumerImpl::acknowledged(r);
}
// Called with lock held. Called in subscription's connection thread.
@@ -341,13 +326,12 @@ void ReplicatingSubscription::sendDequeu
// Called after the message has been removed
// from the deque and under the messageLock in the queue. Called in
// arbitrary connection threads.
-void ReplicatingSubscription::dequeued(const QueuedMessage& qm)
+void ReplicatingSubscription::dequeued(const Message& m)
{
- assert (qm.queue == getQueue().get());
- QPID_LOG(trace, logPrefix << "Dequeued " << qm);
+ QPID_LOG(trace, logPrefix << "Dequeued " << getQueue()->getName() << "[" << m.getSequence() << "]");
{
Mutex::ScopedLock l(lock);
- dequeues.add(qm.position);
+ dequeues.add(m.getSequence());
}
notify(); // Ensure a call to doDispatch
}
@@ -379,7 +363,7 @@ void ReplicatingSubscription::sendPositi
void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer& buffer)
{
//generate event message
- boost::intrusive_ptr<Message> event = new Message();
+ boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> event(new qpid::broker::amqp_0_10::MessageTransfer());
AMQFrame method((MessageTransferBody(ProtocolVersion(), string(), 0, 0)));
AMQFrame header((AMQHeaderBody()));
AMQFrame content((AMQContentBody()));
@@ -400,10 +384,8 @@ void ReplicatingSubscription::sendEvent(
event->getFrames().getHeaders()->get<DeliveryProperties>(true);
props->setRoutingKey(key);
// Send the event directly to the base consumer implementation.
- // We don't really need a queue here but we pass a dummy queue
- // to conform to the consumer API.
- QueuedMessage qm(dummy.get(), event);
- ConsumerImpl::deliver(qm);
+ //dummy consumer prevents acknowledgements being handled, which is what we want for events
+ ConsumerImpl::deliver(QueueCursor(), Message(event, 0), boost::shared_ptr<Consumer>());
}
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h Fri Aug 10 12:04:27 2012
@@ -101,15 +101,15 @@ class ReplicatingSubscription : public b
// Called via QueueGuard::dequeued.
//@return true if the message requires completion.
- void dequeued(const broker::QueuedMessage& qm);
+ void dequeued(const broker::Message&);
// Called during initial scan for dequeues.
void dequeued(framing::SequenceNumber first, framing::SequenceNumber last);
// Consumer overrides.
- bool deliver(broker::QueuedMessage& msg);
+ bool deliver(const broker::QueueCursor& cursor, const broker::Message& msg);
void cancel();
- void acknowledged(const broker::QueuedMessage&);
+ void acknowledged(const broker::DeliveryRecord&);
bool browseAcquired() const { return true; }
// Hide the "queue deleted" error for a ReplicatingSubscription when a
// queue is deleted, this is normal and not an error.
@@ -127,8 +127,8 @@ class ReplicatingSubscription : public b
private:
std::string logPrefix;
- boost::shared_ptr<broker::Queue> dummy; // Used to send event messages
framing::SequenceSet dequeues;
+ framing::SequenceNumber position;
framing::SequenceNumber backupPosition;
bool ready;
BrokerInfo info;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org