You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/21 16:42:51 UTC
svn commit: r1187375 [7/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/
cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/
cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/
cpp/bindings/qmf2/python/ cpp/bindings/qmf2...
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/Broker.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/Broker.cpp Fri Oct 21 14:42:12 2011
@@ -20,6 +20,7 @@
*/
#include "qpid/broker/Broker.h"
+#include "qpid/broker/ConnectionState.h"
#include "qpid/broker/DirectExchange.h"
#include "qpid/broker/FanOutExchange.h"
#include "qpid/broker/HeadersExchange.h"
@@ -31,12 +32,26 @@
#include "qpid/broker/TopicExchange.h"
#include "qpid/broker/Link.h"
#include "qpid/broker/ExpiryPolicy.h"
+#include "qpid/broker/QueueFlowLimit.h"
+#include "qpid/broker/MessageGroupManager.h"
#include "qmf/org/apache/qpid/broker/Package.h"
+#include "qmf/org/apache/qpid/broker/ArgsBrokerCreate.h"
+#include "qmf/org/apache/qpid/broker/ArgsBrokerDelete.h"
+#include "qmf/org/apache/qpid/broker/ArgsBrokerQuery.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerEcho.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerGetLogLevel.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerQueueMoveMessages.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerSetLogLevel.h"
+#include "qmf/org/apache/qpid/broker/ArgsBrokerSetTimestampConfig.h"
+#include "qmf/org/apache/qpid/broker/ArgsBrokerGetTimestampConfig.h"
+#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
+#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
+#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h"
+#include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
+#include "qmf/org/apache/qpid/broker/EventBind.h"
+#include "qmf/org/apache/qpid/broker/EventUnbind.h"
+#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/management/ManagementDirectExchange.h"
#include "qpid/management/ManagementTopicExchange.h"
#include "qpid/log/Logger.h"
@@ -44,7 +59,9 @@
#include "qpid/log/Statement.h"
#include "qpid/log/posix/SinkOptions.h"
#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/FieldTable.h"
#include "qpid/framing/ProtocolInitiation.h"
+#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/Uuid.h"
#include "qpid/sys/ProtocolFactory.h"
#include "qpid/sys/Poller.h"
@@ -76,7 +93,10 @@ using qpid::management::ManagementAgent;
using qpid::management::ManagementObject;
using qpid::management::Manageable;
using qpid::management::Args;
+using qpid::management::getManagementExecutionContext;
+using qpid::types::Variant;
using std::string;
+using std::make_pair;
namespace _qmf = qmf::org::apache::qpid::broker;
@@ -103,7 +123,12 @@ Broker::Options::Options(const std::stri
maxSessionRate(0),
asyncQueueEvents(false), // Must be false in a cluster.
qmf2Support(true),
- qmf1Support(true)
+ qmf1Support(true),
+ queueFlowStopRatio(80),
+ queueFlowResumeRatio(70),
+ queueThresholdEventRatio(80),
+ defaultMsgGroup("qpid.no-group"),
+ timestampRcvMsgs(false) // set the 0.10 timestamp delivery property
{
int c = sys::SystemInfo::concurrency();
workerThreads=c+1;
@@ -134,9 +159,14 @@ Broker::Options::Options(const std::stri
("tcp-nodelay", optValue(tcpNoDelay), "Set TCP_NODELAY on TCP connections")
("require-encryption", optValue(requireEncrypted), "Only accept connections that are encrypted")
("known-hosts-url", optValue(knownHosts, "URL or 'none'"), "URL to send as 'known-hosts' to clients ('none' implies empty list)")
- ("sasl-config", optValue(saslConfigPath, "FILE"), "gets sasl config from nonstandard location")
+ ("sasl-config", optValue(saslConfigPath, "DIR"), "gets sasl config info from nonstandard location")
("max-session-rate", optValue(maxSessionRate, "MESSAGES/S"), "Sets the maximum message rate per session (0=unlimited)")
- ("async-queue-events", optValue(asyncQueueEvents, "yes|no"), "Set Queue Events async, used for services like replication");
+ ("async-queue-events", optValue(asyncQueueEvents, "yes|no"), "Set Queue Events async, used for services like replication")
+ ("default-flow-stop-threshold", optValue(queueFlowStopRatio, "PERCENT"), "Percent of queue's maximum capacity at which flow control is activated.")
+ ("default-flow-resume-threshold", optValue(queueFlowResumeRatio, "PERCENT"), "Percent of queue's maximum capacity at which flow control is de-activated.")
+ ("default-event-threshold-ratio", optValue(queueThresholdEventRatio, "%age of limit"), "The ratio of any specified queue limit at which an event will be raised")
+ ("default-message-group", optValue(defaultMsgGroup, "GROUP-IDENTIFER"), "Group identifier to assign to messages delivered to a message group queue that do not contain an identifier.")
+ ("enable-timestamp", optValue(timestampRcvMsgs, "yes|no"), "Add current time to each received message.");
}
const std::string empty;
@@ -166,9 +196,10 @@ Broker::Broker(const Broker::Options& co
conf.replayFlushLimit*1024, // convert kb to bytes.
conf.replayHardLimit*1024),
*this),
- queueCleaner(queues, timer),
- queueEvents(poller,!conf.asyncQueueEvents),
+ queueCleaner(queues, &timer),
+ queueEvents(poller,!conf.asyncQueueEvents),
recovery(true),
+ inCluster(false),
clusterUpdatee(false),
expiryPolicy(new ExpiryPolicy),
connectionCounter(conf.maxConnections),
@@ -225,8 +256,11 @@ Broker::Broker(const Broker::Options& co
// Early-Initialize plugins
Plugin::earlyInitAll(*this);
+ QueueFlowLimit::setDefaults(conf.queueLimit, conf.queueFlowStopRatio, conf.queueFlowResumeRatio);
+ MessageGroupManager::setDefaults(conf.defaultMsgGroup);
+
// If no plugin store module registered itself, set up the null store.
- if (NullMessageStore::isNullStore(store.get()))
+ if (NullMessageStore::isNullStore(store.get()))
setStore();
exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
@@ -271,6 +305,11 @@ Broker::Broker(const Broker::Options& co
else
QPID_LOG(info, "Management not enabled");
+ // this feature affects performance, so let's be sure that gets logged!
+ if (conf.timestampRcvMsgs) {
+ QPID_LOG(notice, "Receive message timestamping is ENABLED.");
+ }
+
/**
* SASL setup, can fail and terminate startup
*/
@@ -345,14 +384,14 @@ void Broker::run() {
Dispatcher d(poller);
int numIOThreads = config.workerThreads;
std::vector<Thread> t(numIOThreads-1);
-
+
// Run n-1 io threads
for (int i=0; i<numIOThreads-1; ++i)
t[i] = Thread(d);
-
+
// Run final thread
d.run();
-
+
// Now wait for n-1 io threads to exit
for (int i=0; i<numIOThreads-1; ++i) {
t[i].join();
@@ -399,9 +438,9 @@ Manageable::status_t Broker::ManagementM
{
case _qmf::Broker::METHOD_ECHO :
QPID_LOG (debug, "Broker::echo("
- << dynamic_cast<_qmf::ArgsBrokerEcho&>(args).io_sequence
- << ", "
- << dynamic_cast<_qmf::ArgsBrokerEcho&>(args).io_body
+ << dynamic_cast<_qmf::ArgsBrokerEcho&>(args).io_sequence
+ << ", "
+ << dynamic_cast<_qmf::ArgsBrokerEcho&>(args).io_body
<< ")");
status = Manageable::STATUS_OK;
break;
@@ -409,8 +448,9 @@ Manageable::status_t Broker::ManagementM
_qmf::ArgsBrokerConnect& hp=
dynamic_cast<_qmf::ArgsBrokerConnect&>(args);
- QPID_LOG (debug, "Broker::connect()");
string transport = hp.i_transport.empty() ? TCP_TRANSPORT : hp.i_transport;
+ QPID_LOG (debug, "Broker::connect() " << hp.i_host << ":" << hp.i_port << "; transport=" << transport <<
+ "; durable=" << (hp.i_durable?"T":"F") << "; authMech=\"" << hp.i_authMechanism << "\"");
if (!getProtocolFactory(transport)) {
QPID_LOG(error, "Transport '" << transport << "' not supported");
return Manageable::STATUS_NOT_IMPLEMENTED;
@@ -427,9 +467,9 @@ Manageable::status_t Broker::ManagementM
_qmf::ArgsBrokerQueueMoveMessages& moveArgs=
dynamic_cast<_qmf::ArgsBrokerQueueMoveMessages&>(args);
QPID_LOG (debug, "Broker::queueMoveMessages()");
- if (queueMoveMessages(moveArgs.i_srcQueue, moveArgs.i_destQueue, moveArgs.i_qty))
+ if (queueMoveMessages(moveArgs.i_srcQueue, moveArgs.i_destQueue, moveArgs.i_qty, moveArgs.i_filter))
status = Manageable::STATUS_OK;
- else
+ else
return Manageable::STATUS_PARAMETER_INVALID;
break;
}
@@ -443,6 +483,38 @@ Manageable::status_t Broker::ManagementM
QPID_LOG (debug, "Broker::getLogLevel()");
status = Manageable::STATUS_OK;
break;
+ case _qmf::Broker::METHOD_CREATE :
+ {
+ _qmf::ArgsBrokerCreate& a = dynamic_cast<_qmf::ArgsBrokerCreate&>(args);
+ createObject(a.i_type, a.i_name, a.i_properties, a.i_strict, getManagementExecutionContext());
+ status = Manageable::STATUS_OK;
+ break;
+ }
+ case _qmf::Broker::METHOD_DELETE :
+ {
+ _qmf::ArgsBrokerDelete& a = dynamic_cast<_qmf::ArgsBrokerDelete&>(args);
+ deleteObject(a.i_type, a.i_name, a.i_options, getManagementExecutionContext());
+ status = Manageable::STATUS_OK;
+ break;
+ }
+ case _qmf::Broker::METHOD_QUERY :
+ {
+ _qmf::ArgsBrokerQuery& a = dynamic_cast<_qmf::ArgsBrokerQuery&>(args);
+ status = queryObject(a.i_type, a.i_name, a.o_results, getManagementExecutionContext());
+ break;
+ }
+ case _qmf::Broker::METHOD_GETTIMESTAMPCONFIG:
+ {
+ _qmf::ArgsBrokerGetTimestampConfig& a = dynamic_cast<_qmf::ArgsBrokerGetTimestampConfig&>(args);
+ status = getTimestampConfig(a.o_receive, getManagementExecutionContext());
+ break;
+ }
+ case _qmf::Broker::METHOD_SETTIMESTAMPCONFIG:
+ {
+ _qmf::ArgsBrokerSetTimestampConfig& a = dynamic_cast<_qmf::ArgsBrokerSetTimestampConfig&>(args);
+ status = setTimestampConfig(a.i_receive, getManagementExecutionContext());
+ break;
+ }
default:
QPID_LOG (debug, "Broker ManagementMethod not implemented: id=" << methodId << "]");
status = Manageable::STATUS_NOT_IMPLEMENTED;
@@ -452,6 +524,240 @@ Manageable::status_t Broker::ManagementM
return status;
}
+namespace
+{
+const std::string TYPE_QUEUE("queue");
+const std::string TYPE_EXCHANGE("exchange");
+const std::string TYPE_TOPIC("topic");
+const std::string TYPE_BINDING("binding");
+const std::string DURABLE("durable");
+const std::string AUTO_DELETE("auto-delete");
+const std::string ALTERNATE_EXCHANGE("alternate-exchange");
+const std::string EXCHANGE_TYPE("exchange-type");
+const std::string QUEUE_NAME("queue");
+const std::string EXCHANGE_NAME("exchange");
+
+const std::string ATTRIBUTE_TIMESTAMP_0_10("timestamp-0.10");
+
+const std::string _TRUE("true");
+const std::string _FALSE("false");
+}
+
+struct InvalidBindingIdentifier : public qpid::Exception
+{
+ InvalidBindingIdentifier(const std::string& name) : qpid::Exception(name) {}
+ std::string getPrefix() const { return "invalid binding"; }
+};
+
+struct BindingIdentifier
+{
+ std::string exchange;
+ std::string queue;
+ std::string key;
+
+ BindingIdentifier(const std::string& name)
+ {
+ std::vector<std::string> path;
+ split(path, name, "/");
+ switch (path.size()) {
+ case 1:
+ queue = path[0];
+ break;
+ case 2:
+ exchange = path[0];
+ queue = path[1];
+ break;
+ case 3:
+ exchange = path[0];
+ queue = path[1];
+ key = path[2];
+ break;
+ default:
+ throw InvalidBindingIdentifier(name);
+ }
+ }
+};
+
+struct ObjectAlreadyExists : public qpid::Exception
+{
+ ObjectAlreadyExists(const std::string& name) : qpid::Exception(name) {}
+ std::string getPrefix() const { return "object already exists"; }
+};
+
+struct UnknownObjectType : public qpid::Exception
+{
+ UnknownObjectType(const std::string& type) : qpid::Exception(type) {}
+ std::string getPrefix() const { return "unknown object type"; }
+};
+
+void Broker::createObject(const std::string& type, const std::string& name,
+ const Variant::Map& properties, bool /*strict*/, const ConnectionState* context)
+{
+ std::string userId;
+ std::string connectionId;
+ if (context) {
+ userId = context->getUserId();
+ connectionId = context->getUrl();
+ }
+ //TODO: implement 'strict' option (check there are no unrecognised properties)
+ QPID_LOG (debug, "Broker::create(" << type << ", " << name << "," << properties << ")");
+ if (type == TYPE_QUEUE) {
+ bool durable(false);
+ bool autodelete(false);
+ std::string alternateExchange;
+ Variant::Map extensions;
+ for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) {
+ // extract durable, auto-delete and alternate-exchange properties
+ if (i->first == DURABLE) durable = i->second;
+ else if (i->first == AUTO_DELETE) autodelete = i->second;
+ else if (i->first == ALTERNATE_EXCHANGE) alternateExchange = i->second.asString();
+ //treat everything else as extension properties
+ else extensions[i->first] = i->second;
+ }
+ framing::FieldTable arguments;
+ amqp_0_10::translate(extensions, arguments);
+
+ std::pair<boost::shared_ptr<Queue>, bool> result =
+ createQueue(name, durable, autodelete, 0, alternateExchange, arguments, userId, connectionId);
+ if (!result.second) {
+ throw ObjectAlreadyExists(name);
+ }
+ } else if (type == TYPE_EXCHANGE || type == TYPE_TOPIC) {
+ bool durable(false);
+ std::string exchangeType("topic");
+ std::string alternateExchange;
+ Variant::Map extensions;
+ for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) {
+ // extract durable, auto-delete and alternate-exchange properties
+ if (i->first == DURABLE) durable = i->second;
+ else if (i->first == EXCHANGE_TYPE) exchangeType = i->second.asString();
+ else if (i->first == ALTERNATE_EXCHANGE) alternateExchange = i->second.asString();
+ //treat everything else as extension properties
+ else extensions[i->first] = i->second;
+ }
+ framing::FieldTable arguments;
+ amqp_0_10::translate(extensions, arguments);
+
+ try {
+ std::pair<boost::shared_ptr<Exchange>, bool> result =
+ createExchange(name, exchangeType, durable, alternateExchange, arguments, userId, connectionId);
+ if (!result.second) {
+ throw ObjectAlreadyExists(name);
+ }
+ } catch (const UnknownExchangeTypeException&) {
+ throw Exception(QPID_MSG("Invalid exchange type: " << exchangeType));
+ }
+ } else if (type == TYPE_BINDING) {
+ BindingIdentifier binding(name);
+ std::string exchangeType("topic");
+ Variant::Map extensions;
+ for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) {
+ // extract durable, auto-delete and alternate-exchange properties
+ if (i->first == EXCHANGE_TYPE) exchangeType = i->second.asString();
+ //treat everything else as extension properties
+ else extensions[i->first] = i->second;
+ }
+ framing::FieldTable arguments;
+ amqp_0_10::translate(extensions, arguments);
+
+ bind(binding.queue, binding.exchange, binding.key, arguments, userId, connectionId);
+ } else {
+ throw UnknownObjectType(type);
+ }
+}
+
+void Broker::deleteObject(const std::string& type, const std::string& name,
+ const Variant::Map& options, const ConnectionState* context)
+{
+ std::string userId;
+ std::string connectionId;
+ if (context) {
+ userId = context->getUserId();
+ connectionId = context->getUrl();
+ }
+ QPID_LOG (debug, "Broker::delete(" << type << ", " << name << "," << options << ")");
+ if (type == TYPE_QUEUE) {
+ deleteQueue(name, userId, connectionId);
+ } else if (type == TYPE_EXCHANGE || type == TYPE_TOPIC) {
+ deleteExchange(name, userId, connectionId);
+ } else if (type == TYPE_BINDING) {
+ BindingIdentifier binding(name);
+ unbind(binding.queue, binding.exchange, binding.key, userId, connectionId);
+ } else {
+ throw UnknownObjectType(type);
+ }
+
+}
+
+Manageable::status_t Broker::queryObject(const std::string& type,
+ const std::string& name,
+ Variant::Map& results,
+ const ConnectionState* context)
+{
+ std::string userId;
+ std::string connectionId;
+ if (context) {
+ userId = context->getUserId();
+ connectionId = context->getUrl();
+ }
+ QPID_LOG (debug, "Broker::query(" << type << ", " << name << ")");
+
+ if (type == TYPE_QUEUE)
+ return queryQueue( name, userId, connectionId, results );
+
+ if (type == TYPE_EXCHANGE ||
+ type == TYPE_TOPIC ||
+ type == TYPE_BINDING)
+ return Manageable::STATUS_NOT_IMPLEMENTED;
+
+ throw UnknownObjectType(type);
+}
+
+Manageable::status_t Broker::queryQueue( const std::string& name,
+ const std::string& userId,
+ const std::string& /*connectionId*/,
+ Variant::Map& results )
+{
+ (void) results;
+ if (acl) {
+ if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_QUEUE, name, NULL) )
+ throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue query request from " << userId));
+ }
+
+ boost::shared_ptr<Queue> q(queues.find(name));
+ if (!q) {
+ QPID_LOG(error, "Query failed: queue not found, name=" << name);
+ return Manageable::STATUS_UNKNOWN_OBJECT;
+ }
+ q->query( results );
+ return Manageable::STATUS_OK;;
+}
+
+Manageable::status_t Broker::getTimestampConfig(bool& receive,
+ const ConnectionState* context)
+{
+ std::string name; // none needed for broker
+ std::string userId = context->getUserId();
+ if (acl && !acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_BROKER, name, NULL)) {
+ throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied broker timestamp get request from " << userId));
+ }
+ receive = config.timestampRcvMsgs;
+ return Manageable::STATUS_OK;
+}
+
+Manageable::status_t Broker::setTimestampConfig(const bool receive,
+ const ConnectionState* context)
+{
+ std::string name; // none needed for broker
+ std::string userId = context->getUserId();
+ if (acl && !acl->authorise(userId, acl::ACT_UPDATE, acl::OBJ_BROKER, name, NULL)) {
+ throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied broker timestamp set request from " << userId));
+ }
+ config.timestampRcvMsgs = receive;
+ QPID_LOG(notice, "Receive message timestamping is " << ((config.timestampRcvMsgs) ? "ENABLED." : "DISABLED."));
+ return Manageable::STATUS_OK;
+}
+
void Broker::setLogLevel(const std::string& level)
{
QPID_LOG(notice, "Changing log level to " << level);
@@ -466,7 +772,7 @@ std::string Broker::getLogLevel()
const std::vector<std::string>& selectors = qpid::log::Logger::instance().getOptions().selectors;
for (std::vector<std::string>::const_iterator i = selectors.begin(); i != selectors.end(); ++i) {
if (i != selectors.begin()) level += std::string(",");
- level += *i;
+ level += *i;
}
return level;
}
@@ -499,7 +805,7 @@ void Broker::accept() {
}
void Broker::connect(
- const std::string& host, uint16_t port, const std::string& transport,
+ const std::string& host, const std::string& port, const std::string& transport,
boost::function2<void, int, std::string> failed,
sys::ConnectionCodec::Factory* f)
{
@@ -515,13 +821,14 @@ void Broker::connect(
{
url.throwIfEmpty();
const Address& addr=url[0];
- connect(addr.host, addr.port, addr.protocol, failed, f);
+ connect(addr.host, boost::lexical_cast<std::string>(addr.port), addr.protocol, failed, f);
}
uint32_t Broker::queueMoveMessages(
const std::string& srcQueue,
const std::string& destQueue,
- uint32_t qty)
+ uint32_t qty,
+ const Variant::Map& filter)
{
Queue::shared_ptr src_queue = queues.find(srcQueue);
if (!src_queue)
@@ -530,7 +837,7 @@ uint32_t Broker::queueMoveMessages(
if (!dest_queue)
return 0;
- return src_queue->move(dest_queue, qty);
+ return src_queue->move(dest_queue, qty, &filter);
}
@@ -548,9 +855,228 @@ bool Broker::deferDeliveryImpl(const std
void Broker::setClusterTimer(std::auto_ptr<sys::Timer> t) {
clusterTimer = t;
+ queueCleaner.setTimer(clusterTimer.get());
+ dtxManager.setTimer(*clusterTimer.get());
}
const std::string Broker::TCP_TRANSPORT("tcp");
+
+std::pair<boost::shared_ptr<Queue>, bool> Broker::createQueue(
+ const std::string& name,
+ bool durable,
+ bool autodelete,
+ const OwnershipToken* owner,
+ const std::string& alternateExchange,
+ const qpid::framing::FieldTable& arguments,
+ const std::string& userId,
+ const std::string& connectionId)
+{
+ if (acl) {
+ std::map<acl::Property, std::string> params;
+ params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
+ params.insert(make_pair(acl::PROP_PASSIVE, _FALSE));
+ params.insert(make_pair(acl::PROP_DURABLE, durable ? _TRUE : _FALSE));
+ params.insert(make_pair(acl::PROP_EXCLUSIVE, owner ? _TRUE : _FALSE));
+ params.insert(make_pair(acl::PROP_AUTODELETE, autodelete ? _TRUE : _FALSE));
+ params.insert(make_pair(acl::PROP_POLICYTYPE, arguments.getAsString("qpid.policy_type")));
+ params.insert(make_pair(acl::PROP_MAXQUEUECOUNT, boost::lexical_cast<string>(arguments.getAsInt("qpid.max_count"))));
+ params.insert(make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast<string>(arguments.getAsInt64("qpid.max_size"))));
+
+ if (!acl->authorise(userId,acl::ACT_CREATE,acl::OBJ_QUEUE,name,¶ms) )
+ throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue create request from " << userId));
+ }
+
+ Exchange::shared_ptr alternate;
+ if (!alternateExchange.empty()) {
+ alternate = exchanges.get(alternateExchange);
+ if (!alternate) throw framing::NotFoundException(QPID_MSG("Alternate exchange does not exist: " << alternateExchange));
+ }
+
+ std::pair<Queue::shared_ptr, bool> result = queues.declare(name, durable, autodelete, owner, alternate, arguments);
+ if (result.second) {
+ //add default binding:
+ result.first->bind(exchanges.getDefault(), name);
+
+ if (managementAgent.get()) {
+ //TODO: debatable whether we should raise an event here for
+ //create when this is a 'declare' event; ideally add a create
+ //event instead?
+ managementAgent->raiseEvent(
+ _qmf::EventQueueDeclare(connectionId, userId, name,
+ durable, owner, autodelete,
+ ManagementAgent::toMap(arguments),
+ "created"));
+ }
+ }
+ return result;
+}
+
+void Broker::deleteQueue(const std::string& name, const std::string& userId,
+ const std::string& connectionId, QueueFunctor check)
+{
+ if (acl && !acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_QUEUE,name,NULL)) {
+ throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue delete request from " << userId));
+ }
+
+ Queue::shared_ptr queue = queues.find(name);
+ if (queue) {
+ if (check) check(queue);
+ queues.destroy(name);
+ queue->destroyed();
+ } else {
+ throw framing::NotFoundException(QPID_MSG("Delete failed. No such queue: " << name));
+ }
+
+ if (managementAgent.get())
+ managementAgent->raiseEvent(_qmf::EventQueueDelete(connectionId, userId, name));
+
+}
+
+std::pair<Exchange::shared_ptr, bool> Broker::createExchange(
+ const std::string& name,
+ const std::string& type,
+ bool durable,
+ const std::string& alternateExchange,
+ const qpid::framing::FieldTable& arguments,
+ const std::string& userId,
+ const std::string& connectionId)
+{
+ if (acl) {
+ std::map<acl::Property, std::string> params;
+ params.insert(make_pair(acl::PROP_TYPE, type));
+ params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
+ params.insert(make_pair(acl::PROP_PASSIVE, _FALSE));
+ params.insert(make_pair(acl::PROP_DURABLE, durable ? _TRUE : _FALSE));
+ if (!acl->authorise(userId,acl::ACT_CREATE,acl::OBJ_EXCHANGE,name,¶ms) )
+ throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange create request from " << userId));
+ }
+
+ Exchange::shared_ptr alternate;
+ if (!alternateExchange.empty()) {
+ alternate = exchanges.get(alternateExchange);
+ if (!alternate) throw framing::NotFoundException(QPID_MSG("Alternate exchange does not exist: " << alternateExchange));
+ }
+
+ std::pair<Exchange::shared_ptr, bool> result;
+ result = exchanges.declare(name, type, durable, arguments);
+ if (result.second) {
+ if (alternate) {
+ result.first->setAlternate(alternate);
+ alternate->incAlternateUsers();
+ }
+ if (durable) {
+ store->create(*result.first, arguments);
+ }
+ if (managementAgent.get()) {
+ //TODO: debatable whether we should raise an event here for
+ //create when this is a 'declare' event; ideally add a create
+ //event instead?
+ managementAgent->raiseEvent(_qmf::EventExchangeDeclare(connectionId,
+ userId,
+ name,
+ type,
+ alternateExchange,
+ durable,
+ false,
+ ManagementAgent::toMap(arguments),
+ "created"));
+ }
+ }
+ return result;
+}
+
+void Broker::deleteExchange(const std::string& name, const std::string& userId,
+ const std::string& connectionId)
+{
+ if (acl) {
+ if (!acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_EXCHANGE,name,NULL) )
+ throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange delete request from " << userId));
+ }
+
+ if (name.empty()) {
+ throw framing::InvalidArgumentException(QPID_MSG("Delete not allowed for default exchange"));
+ }
+ Exchange::shared_ptr exchange(exchanges.get(name));
+ if (!exchange) throw framing::NotFoundException(QPID_MSG("Delete failed. No such exchange: " << name));
+ if (exchange->inUseAsAlternate()) throw framing::NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange."));
+ if (exchange->isDurable()) store->destroy(*exchange);
+ if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers();
+ exchanges.destroy(name);
+
+ if (managementAgent.get())
+ managementAgent->raiseEvent(_qmf::EventExchangeDelete(connectionId, userId, name));
+
+}
+
+void Broker::bind(const std::string& queueName,
+ const std::string& exchangeName,
+ const std::string& key,
+ const qpid::framing::FieldTable& arguments,
+ const std::string& userId,
+ const std::string& connectionId)
+{
+ if (acl) {
+ std::map<acl::Property, std::string> params;
+ params.insert(make_pair(acl::PROP_QUEUENAME, queueName));
+ params.insert(make_pair(acl::PROP_ROUTINGKEY, key));
+
+ if (!acl->authorise(userId,acl::ACT_BIND,acl::OBJ_EXCHANGE,exchangeName,¶ms))
+ throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange bind request from " << userId));
+ }
+ if (exchangeName.empty()) {
+ throw framing::InvalidArgumentException(QPID_MSG("Bind not allowed for default exchange"));
+ }
+
+ Queue::shared_ptr queue = queues.find(queueName);
+ Exchange::shared_ptr exchange = exchanges.get(exchangeName);
+ if (!queue) {
+ throw framing::NotFoundException(QPID_MSG("Bind failed. No such queue: " << queueName));
+ } else if (!exchange) {
+ throw framing::NotFoundException(QPID_MSG("Bind failed. No such exchange: " << exchangeName));
+ } else {
+ if (queue->bind(exchange, key, arguments)) {
+ if (managementAgent.get()) {
+ managementAgent->raiseEvent(_qmf::EventBind(connectionId, userId, exchangeName,
+ queueName, key, ManagementAgent::toMap(arguments)));
+ }
+ }
+ }
+}
+
+void Broker::unbind(const std::string& queueName,
+ const std::string& exchangeName,
+ const std::string& key,
+ const std::string& userId,
+ const std::string& connectionId)
+{
+ if (acl) {
+ std::map<acl::Property, std::string> params;
+ params.insert(make_pair(acl::PROP_QUEUENAME, queueName));
+ params.insert(make_pair(acl::PROP_ROUTINGKEY, key));
+ if (!acl->authorise(userId,acl::ACT_UNBIND,acl::OBJ_EXCHANGE,exchangeName,¶ms) )
+ throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange unbind request from " << userId));
+ }
+ if (exchangeName.empty()) {
+ throw framing::InvalidArgumentException(QPID_MSG("Unbind not allowed for default exchange"));
+ }
+ Queue::shared_ptr queue = queues.find(queueName);
+ Exchange::shared_ptr exchange = exchanges.get(exchangeName);
+ if (!queue) {
+ throw framing::NotFoundException(QPID_MSG("Unbind failed. No such queue: " << queueName));
+ } else if (!exchange) {
+ throw framing::NotFoundException(QPID_MSG("Unbind failed. No such exchange: " << exchangeName));
+ } else {
+ if (exchange->unbind(queue, key, 0)) {
+ if (exchange->isDurable() && queue->isDurable()) {
+ store->unbind(*exchange, *queue, key, qpid::framing::FieldTable());
+ }
+ if (managementAgent.get()) {
+ managementAgent->raiseEvent(_qmf::EventUnbind(connectionId, userId, exchangeName, queueName, key));
+ }
+ }
+ }
+}
+
}} // namespace qpid::broker
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/Broker.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/Broker.h Fri Oct 21 14:42:12 2011
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -49,6 +49,7 @@
#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/sys/Runnable.h"
#include "qpid/sys/Timer.h"
+#include "qpid/types/Variant.h"
#include "qpid/RefCounted.h"
#include "qpid/broker/AclModule.h"
#include "qpid/sys/Mutex.h"
@@ -57,7 +58,7 @@
#include <string>
#include <vector>
-namespace qpid {
+namespace qpid {
namespace sys {
class ProtocolFactory;
@@ -68,6 +69,7 @@ struct Url;
namespace broker {
+class ConnectionState;
class ExpiryPolicy;
class Message;
@@ -80,7 +82,7 @@ struct NoSuchTransportException : qpid::
};
/**
- * A broker instance.
+ * A broker instance.
*/
class Broker : public sys::Runnable, public Plugin::Target,
public management::Manageable,
@@ -116,29 +118,34 @@ public:
bool asyncQueueEvents;
bool qmf2Support;
bool qmf1Support;
+ uint queueFlowStopRatio; // producer flow control: on
+ uint queueFlowResumeRatio; // producer flow control: off
+ uint16_t queueThresholdEventRatio;
+ std::string defaultMsgGroup;
+ bool timestampRcvMsgs;
private:
std::string getHome();
};
-
+
class ConnectionCounter {
int maxConnections;
int connectionCount;
sys::Mutex connectionCountLock;
public:
ConnectionCounter(int mc): maxConnections(mc),connectionCount(0) {};
- void inc_connectionCount() {
- sys::ScopedLock<sys::Mutex> l(connectionCountLock);
+ void inc_connectionCount() {
+ sys::ScopedLock<sys::Mutex> l(connectionCountLock);
connectionCount++;
- }
- void dec_connectionCount() {
- sys::ScopedLock<sys::Mutex> l(connectionCountLock);
+ }
+ void dec_connectionCount() {
+ sys::ScopedLock<sys::Mutex> l(connectionCountLock);
connectionCount--;
}
bool allowConnection() {
- sys::ScopedLock<sys::Mutex> l(connectionCountLock);
+ sys::ScopedLock<sys::Mutex> l(connectionCountLock);
return (maxConnections <= connectionCount);
- }
+ }
};
private:
@@ -148,7 +155,20 @@ public:
void setStore ();
void setLogLevel(const std::string& level);
std::string getLogLevel();
-
+ void createObject(const std::string& type, const std::string& name,
+ const qpid::types::Variant::Map& properties, bool strict, const ConnectionState* context);
+ void deleteObject(const std::string& type, const std::string& name,
+ const qpid::types::Variant::Map& options, const ConnectionState* context);
+ Manageable::status_t queryObject(const std::string& type, const std::string& name,
+ qpid::types::Variant::Map& results, const ConnectionState* context);
+ Manageable::status_t queryQueue( const std::string& name,
+ const std::string& userId,
+ const std::string& connectionId,
+ qpid::types::Variant::Map& results);
+ Manageable::status_t getTimestampConfig(bool& receive,
+ const ConnectionState* context);
+ Manageable::status_t setTimestampConfig(const bool receive,
+ const ConnectionState* context);
boost::shared_ptr<sys::Poller> poller;
sys::Timer timer;
std::auto_ptr<sys::Timer> clusterTimer;
@@ -176,10 +196,10 @@ public:
const boost::intrusive_ptr<Message>& msg);
std::string federationTag;
bool recovery;
- bool clusterUpdatee;
+ bool inCluster, clusterUpdatee;
boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
ConnectionCounter connectionCounter;
-
+
public:
virtual ~Broker();
@@ -235,7 +255,7 @@ public:
QPID_BROKER_EXTERN void accept();
/** Create a connection to another broker. */
- void connect(const std::string& host, uint16_t port,
+ void connect(const std::string& host, const std::string& port,
const std::string& transport,
boost::function2<void, int, std::string> failed,
sys::ConnectionCodec::Factory* =0);
@@ -247,9 +267,10 @@ public:
/** Move messages from one queue to another.
A zero quantity means to move all messages
*/
- uint32_t queueMoveMessages( const std::string& srcQueue,
+ uint32_t queueMoveMessages( const std::string& srcQueue,
const std::string& destQueue,
- uint32_t qty);
+ uint32_t qty,
+ const qpid::types::Variant::Map& filter);
boost::shared_ptr<sys::ProtocolFactory> getProtocolFactory(const std::string& name = TCP_TRANSPORT) const;
@@ -273,11 +294,20 @@ public:
void setRecovery(bool set) { recovery = set; }
bool getRecovery() const { return recovery; }
- void setClusterUpdatee(bool set) { clusterUpdatee = set; }
+ /** True of this broker is part of a cluster.
+ * Only valid after early initialization of plugins is complete.
+ */
+ bool isInCluster() const { return inCluster; }
+ void setInCluster(bool set) { inCluster = set; }
+
+ /** True if this broker is joining a cluster and in the process of
+ * receiving a state update.
+ */
bool isClusterUpdatee() const { return clusterUpdatee; }
+ void setClusterUpdatee(bool set) { clusterUpdatee = set; }
management::ManagementAgent* getManagementAgent() { return managementAgent.get(); }
-
+
ConnectionCounter& getConnectionCounter() {return connectionCounter;}
/**
@@ -290,6 +320,43 @@ public:
const boost::intrusive_ptr<Message>& msg)> deferDelivery;
bool isAuthenticating ( ) { return config.auth; }
+ bool isTimestamping() { return config.timestampRcvMsgs; }
+
+ typedef boost::function1<void, boost::shared_ptr<Queue> > QueueFunctor;
+
+ std::pair<boost::shared_ptr<Queue>, bool> createQueue(
+ const std::string& name,
+ bool durable,
+ bool autodelete,
+ const OwnershipToken* owner,
+ const std::string& alternateExchange,
+ const qpid::framing::FieldTable& arguments,
+ const std::string& userId,
+ const std::string& connectionId);
+ void deleteQueue(const std::string& name,
+ const std::string& userId,
+ const std::string& connectionId,
+ QueueFunctor check = QueueFunctor());
+ std::pair<Exchange::shared_ptr, bool> createExchange(
+ const std::string& name,
+ const std::string& type,
+ bool durable,
+ const std::string& alternateExchange,
+ const qpid::framing::FieldTable& args,
+ const std::string& userId, const std::string& connectionId);
+ void deleteExchange(const std::string& name, const std::string& userId,
+ const std::string& connectionId);
+ void bind(const std::string& queue,
+ const std::string& exchange,
+ const std::string& key,
+ const qpid::framing::FieldTable& arguments,
+ const std::string& userId,
+ const std::string& connectionId);
+ void unbind(const std::string& queue,
+ const std::string& exchange,
+ const std::string& key,
+ const std::string& userId,
+ const std::string& connectionId);
};
}}
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/BrokerImportExport.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/BrokerImportExport.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/BrokerImportExport.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/BrokerImportExport.h Fri Oct 21 14:42:12 2011
@@ -20,14 +20,23 @@
* under the License.
*/
-#if defined(WIN32) && !defined(QPID_BROKER_STATIC)
-#if defined(BROKER_EXPORT) || defined (qpidbroker_EXPORTS)
-#define QPID_BROKER_EXTERN __declspec(dllexport)
+#if defined(WIN32) && !defined(QPID_DECLARE_STATIC)
+# if defined(BROKER_EXPORT) || defined (qpidbroker_EXPORTS)
+# define QPID_BROKER_EXTERN __declspec(dllexport)
+# else
+# define QPID_BROKER_EXTERN __declspec(dllimport)
+# endif
+# ifdef _MSC_VER
+# define QPID_BROKER_CLASS_EXTERN
+# define QPID_BROKER_INLINE_EXTERN QPID_BROKER_EXTERN
+# else
+# define QPID_BROKER_CLASS_EXTERN QPID_BROKER_EXTERN
+# define QPID_BROKER_INLINE_EXTERN
+# endif
#else
-#define QPID_BROKER_EXTERN __declspec(dllimport)
-#endif
-#else
-#define QPID_BROKER_EXTERN
+# define QPID_BROKER_EXTERN
+# define QPID_BROKER_CLASS_EXTERN
+# define QPID_BROKER_INLINE_EXTERN
#endif
#endif
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/Connection.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/Connection.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/Connection.cpp Fri Oct 21 14:42:12 2011
@@ -156,16 +156,7 @@ Connection::~Connection()
void Connection::received(framing::AMQFrame& frame) {
// Received frame on connection so delay timeout
restartTimeout();
-
- if (frame.getChannel() == 0 && frame.getMethod()) {
- adapter.handle(frame);
- } else {
- if (adapter.isOpen())
- getChannel(frame.getChannel()).in(frame);
- else
- close(connection::CLOSE_CODE_FRAMING_ERROR, "Connection not yet open, invalid frame received.");
- }
-
+ adapter.handle(frame);
if (isLink) //i.e. we are acting as the client to another broker
recordFromServer(frame);
else
@@ -278,8 +269,7 @@ void Connection::setUserId(const string&
ConnectionState::setUserId(userId);
// In a cluster, the cluster code will raise the connect event
// when the connection is replicated to the cluster.
- if (!sys::isCluster())
- raiseConnectEvent();
+ if (!broker.isInCluster()) raiseConnectEvent();
}
void Connection::raiseConnectEvent() {
@@ -289,11 +279,11 @@ void Connection::raiseConnectEvent() {
}
}
-void Connection::setFederationLink(bool b)
+void Connection::setUserProxyAuth(bool b)
{
- ConnectionState::setFederationLink(b);
+ ConnectionState::setUserProxyAuth(b);
if (mgmtObject != 0)
- mgmtObject->set_federationLink(b);
+ mgmtObject->set_userProxyAuth(b);
}
void Connection::close(connection::CloseCode code, const string& text)
@@ -332,31 +322,30 @@ void Connection::closed(){ // Physically
try {
while (!channels.empty())
ptr_map_ptr(channels.begin())->handleDetach();
- while (!exclusiveQueues.empty()) {
- boost::shared_ptr<Queue> q(exclusiveQueues.front());
- q->releaseExclusiveOwnership();
- if (q->canAutoDelete()) {
- Queue::tryAutoDelete(broker, q);
- }
- exclusiveQueues.erase(exclusiveQueues.begin());
- }
} catch(std::exception& e) {
QPID_LOG(error, QPID_MSG("While closing connection: " << e.what()));
assert(0);
}
}
+void Connection::doIoCallbacks() {
+ {
+ ScopedLock<Mutex> l(ioCallbackLock);
+ // Although IO callbacks execute in the connection thread context, they are
+ // not cluster safe because they are queued for execution in non-IO threads.
+ ClusterUnsafeScope cus;
+ while (!ioCallbacks.empty()) {
+ boost::function0<void> cb = ioCallbacks.front();
+ ioCallbacks.pop();
+ ScopedUnlock<Mutex> ul(ioCallbackLock);
+ cb(); // Lend the IO thread for management processing
+ }
+ }
+}
+
bool Connection::doOutput() {
try {
- {
- ScopedLock<Mutex> l(ioCallbackLock);
- while (!ioCallbacks.empty()) {
- boost::function0<void> cb = ioCallbacks.front();
- ioCallbacks.pop();
- ScopedUnlock<Mutex> ul(ioCallbackLock);
- cb(); // Lend the IO thread for management processing
- }
- }
+ doIoCallbacks();
if (mgmtClosing) {
closed();
close(connection::CLOSE_CODE_CONNECTION_FORCED, "Closed by Management Request");
@@ -476,8 +465,8 @@ void Connection::OutboundFrameTracker::a
void Connection::OutboundFrameTracker::activateOutput() { next->activateOutput(); }
void Connection::OutboundFrameTracker::giveReadCredit(int32_t credit) { next->giveReadCredit(credit); }
void Connection::OutboundFrameTracker::send(framing::AMQFrame& f)
-{
- next->send(f);
+{
+ next->send(f);
con.sent(f);
}
void Connection::OutboundFrameTracker::wrap(sys::ConnectionOutputHandlerPtr& p)
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/Connection.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/Connection.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/Connection.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/Connection.h Fri Oct 21 14:42:12 2011
@@ -125,7 +125,7 @@ class Connection : public sys::Connectio
const std::string& getUserId() const { return ConnectionState::getUserId(); }
const std::string& getMgmtId() const { return mgmtId; }
management::ManagementAgent* getAgent() const { return agent; }
- void setFederationLink(bool b);
+ void setUserProxyAuth(bool b);
/** Connection does not delete the listener. 0 resets. */
void setErrorListener(ErrorListener* l) { errorListener=l; }
ErrorListener* getErrorListener() { return errorListener; }
@@ -153,13 +153,16 @@ class Connection : public sys::Connectio
void addManagementObject();
const qpid::sys::SecuritySettings& getExternalSecuritySettings() const
- {
+ {
return securitySettings;
}
/** @return true if the initial connection negotiation is complete. */
bool isOpen();
+ // Used by cluster during catch-up, see cluster::OutputInterceptor
+ void doIoCallbacks();
+
private:
typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap;
typedef std::vector<boost::shared_ptr<Queue> >::iterator queue_iterator;
@@ -201,7 +204,7 @@ class Connection : public sys::Connectio
sys::ConnectionOutputHandler* next;
};
OutboundFrameTracker outboundTracker;
-
+
void sent(const framing::AMQFrame& f);
public:
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/ConnectionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/ConnectionHandler.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/ConnectionHandler.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/ConnectionHandler.cpp Fri Oct 21 14:42:12 2011
@@ -26,6 +26,7 @@
#include "qpid/broker/SecureConnection.h"
#include "qpid/Url.h"
#include "qpid/framing/AllInvoker.h"
+#include "qpid/framing/ConnectionStartOkBody.h"
#include "qpid/framing/enum.h"
#include "qpid/log/Statement.h"
#include "qpid/sys/SecurityLayer.h"
@@ -63,13 +64,31 @@ void ConnectionHandler::heartbeat()
handler->proxy.heartbeat();
}
+bool ConnectionHandler::handle(const framing::AMQMethodBody& method)
+{
+ //Need special handling for start-ok, in order to distinguish
+ //between null and empty response
+ if (method.isA<ConnectionStartOkBody>()) {
+ handler->startOk(dynamic_cast<const ConnectionStartOkBody&>(method));
+ return true;
+ } else {
+ return invoke(static_cast<AMQP_AllOperations::ConnectionHandler&>(*handler), method);
+ }
+}
+
void ConnectionHandler::handle(framing::AMQFrame& frame)
{
AMQMethodBody* method=frame.getBody()->getMethod();
Connection::ErrorListener* errorListener = handler->connection.getErrorListener();
try{
- if (!invoke(static_cast<AMQP_AllOperations::ConnectionHandler&>(*handler.get()), *method)) {
+ if (method && handle(*method)) {
+ // This is a connection control frame, nothing more to do.
+ } else if (isOpen()) {
handler->connection.getChannel(frame.getChannel()).in(frame);
+ } else {
+ handler->proxy.close(
+ connection::CLOSE_CODE_FRAMING_ERROR,
+ "Connection not yet open, invalid frame received.");
}
}catch(ConnectionException& e){
if (errorListener) errorListener->connectionError(e.what());
@@ -89,13 +108,10 @@ ConnectionHandler::ConnectionHandler(Con
ConnectionHandler::Handler::Handler(Connection& c, bool isClient, bool isShadow) :
proxy(c.getOutput()),
- connection(c), serverMode(!isClient), acl(0), secured(0),
+ connection(c), serverMode(!isClient), secured(0),
isOpen(false)
{
if (serverMode) {
-
- acl = connection.getBroker().getAcl();
-
FieldTable properties;
Array mechanisms(0x95);
@@ -118,13 +134,20 @@ ConnectionHandler::Handler::Handler(Conn
ConnectionHandler::Handler::~Handler() {}
-void ConnectionHandler::Handler::startOk(const framing::FieldTable& clientProperties,
- const string& mechanism,
- const string& response,
+void ConnectionHandler::Handler::startOk(const framing::FieldTable& /*clientProperties*/,
+ const string& /*mechanism*/,
+ const string& /*response*/,
const string& /*locale*/)
{
+ //Need special handling for start-ok, in order to distinguish
+ //between null and empty response -> should never use this method
+ assert(false);
+}
+
+void ConnectionHandler::Handler::startOk(const ConnectionStartOkBody& body)
+{
try {
- authenticator->start(mechanism, response);
+ authenticator->start(body.getMechanism(), body.hasResponse() ? &body.getResponse() : 0);
} catch (std::exception& /*e*/) {
management::ManagementAgent* agent = connection.getAgent();
if (agent) {
@@ -136,9 +159,14 @@ void ConnectionHandler::Handler::startOk
}
throw;
}
+ const framing::FieldTable& clientProperties = body.getClientProperties();
connection.setFederationLink(clientProperties.get(QPID_FED_LINK));
- connection.setFederationPeerTag(clientProperties.getAsString(QPID_FED_TAG));
+ if (clientProperties.isSet(QPID_FED_TAG)) {
+ connection.setFederationPeerTag(clientProperties.getAsString(QPID_FED_TAG));
+ }
if (connection.isFederationLink()) {
+ AclModule* acl = connection.getBroker().getAcl();
+ FieldTable properties;
if (acl && !acl->authorise(connection.getUserId(),acl::ACT_CREATE,acl::OBJ_LINK,"")){
proxy.close(framing::connection::CLOSE_CODE_CONNECTION_FORCED,"ACL denied creating a federation link");
return;
@@ -183,7 +211,7 @@ void ConnectionHandler::Handler::secureO
void ConnectionHandler::Handler::tuneOk(uint16_t /*channelmax*/,
uint16_t framemax, uint16_t heartbeat)
{
- connection.setFrameMax(framemax);
+ if (framemax) connection.setFrameMax(framemax);
connection.setHeartbeatInterval(heartbeat);
}
@@ -256,7 +284,6 @@ void ConnectionHandler::Handler::start(c
false ); // disallow interaction
}
std::string supportedMechanismsList;
- bool requestedMechanismIsSupported = false;
Array::const_iterator i;
/*
@@ -269,11 +296,9 @@ void ConnectionHandler::Handler::start(c
if (i != supportedMechanisms.begin())
supportedMechanismsList += SPACE;
supportedMechanismsList += (*i)->get<std::string>();
- requestedMechanismIsSupported = true;
}
}
else {
- requestedMechanismIsSupported = false;
/*
The caller has requested a mechanism. If it's available,
make sure it ends up at the head of the list.
@@ -282,7 +307,6 @@ void ConnectionHandler::Handler::start(c
string currentMechanism = (*i)->get<std::string>();
if ( requestedMechanism == currentMechanism ) {
- requestedMechanismIsSupported = true;
supportedMechanismsList = currentMechanism + SPACE + supportedMechanismsList;
} else {
if (i != supportedMechanisms.begin())
@@ -292,7 +316,9 @@ void ConnectionHandler::Handler::start(c
}
}
- connection.setFederationPeerTag(serverProperties.getAsString(QPID_FED_TAG));
+ if (serverProperties.isSet(QPID_FED_TAG)) {
+ connection.setFederationPeerTag(serverProperties.getAsString(QPID_FED_TAG));
+ }
FieldTable ft;
ft.setInt(QPID_FED_LINK,1);
@@ -301,11 +327,21 @@ void ConnectionHandler::Handler::start(c
string response;
if (sasl.get()) {
const qpid::sys::SecuritySettings& ss = connection.getExternalSecuritySettings();
- response = sasl->start ( requestedMechanism.empty()
- ? supportedMechanismsList
- : requestedMechanism,
- & ss );
- proxy.startOk ( ft, sasl->getMechanism(), response, en_US );
+ if (sasl->start ( requestedMechanism.empty()
+ ? supportedMechanismsList
+ : requestedMechanism,
+ response,
+ & ss )) {
+ proxy.startOk ( ft, sasl->getMechanism(), response, en_US );
+ } else {
+ //response was null
+ ConnectionStartOkBody body;
+ body.setClientProperties(ft);
+ body.setMechanism(sasl->getMechanism());
+ //Don't set response, as none was given
+ body.setLocale(en_US);
+ proxy.send(body);
+ }
}
else {
response = ((char)0) + username + ((char)0) + password;
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/ConnectionHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/ConnectionHandler.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/ConnectionHandler.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/ConnectionHandler.h Fri Oct 21 14:42:12 2011
@@ -26,8 +26,10 @@
#include "qpid/broker/SaslAuthenticator.h"
#include "qpid/framing/amqp_types.h"
#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/AMQMethodBody.h"
#include "qpid/framing/AMQP_AllOperations.h"
#include "qpid/framing/AMQP_AllProxy.h"
+#include "qpid/framing/ConnectionStartOkBody.h"
#include "qpid/framing/enum.h"
#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/ProtocolInitiation.h"
@@ -57,12 +59,12 @@ class ConnectionHandler : public framing
Connection& connection;
bool serverMode;
std::auto_ptr<SaslAuthenticator> authenticator;
- AclModule* acl;
SecureConnection* secured;
bool isOpen;
Handler(Connection& connection, bool isClient, bool isShadow=false);
~Handler();
+ void startOk(const qpid::framing::ConnectionStartOkBody& body);
void startOk(const qpid::framing::FieldTable& clientProperties,
const std::string& mechanism, const std::string& response,
const std::string& locale);
@@ -96,7 +98,7 @@ class ConnectionHandler : public framing
};
std::auto_ptr<Handler> handler;
-
+ bool handle(const qpid::framing::AMQMethodBody& method);
public:
ConnectionHandler(Connection& connection, bool isClient, bool isShadow=false );
void close(framing::connection::CloseCode code, const std::string& text);
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/ConnectionState.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/ConnectionState.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/ConnectionState.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/ConnectionState.h Fri Oct 21 14:42:12 2011
@@ -46,6 +46,7 @@ class ConnectionState : public Connectio
framemax(65535),
heartbeat(0),
heartbeatmax(120),
+ userProxyAuth(false), // Can proxy msgs with non-matching auth ids when true (used by federation links & clustering)
federationLink(true),
clientSupportsThrottling(false),
clusterOrderOut(0)
@@ -67,8 +68,10 @@ class ConnectionState : public Connectio
void setUrl(const std::string& _url) { url = _url; }
const std::string& getUrl() const { return url; }
- void setFederationLink(bool b) { federationLink = b; }
- bool isFederationLink() const { return federationLink; }
+ void setUserProxyAuth(const bool b) { userProxyAuth = b; }
+ bool isUserProxyAuth() const { return userProxyAuth || federationPeerTag.size() > 0; } // links can proxy msgs with non-matching auth ids
+ void setFederationLink(bool b) { federationLink = b; } // deprecated - use setFederationPeerTag() instead
+ bool isFederationLink() const { return federationPeerTag.size() > 0; }
void setFederationPeerTag(const std::string& tag) { federationPeerTag = std::string(tag); }
const std::string& getFederationPeerTag() const { return federationPeerTag; }
std::vector<Url>& getKnownHosts() { return knownHosts; }
@@ -79,7 +82,6 @@ class ConnectionState : public Connectio
Broker& getBroker() { return broker; }
Broker& broker;
- std::vector<boost::shared_ptr<Queue> > exclusiveQueues;
//contained output tasks
sys::AggregateOutput outputTasks;
@@ -106,6 +108,7 @@ class ConnectionState : public Connectio
uint16_t heartbeatmax;
std::string userId;
std::string url;
+ bool userProxyAuth;
bool federationLink;
std::string federationPeerTag;
std::vector<Url> knownHosts;
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/Consumer.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/Consumer.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/Consumer.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/Consumer.h Fri Oct 21 14:42:12 2011
@@ -29,22 +29,33 @@ namespace qpid {
namespace broker {
class Queue;
+class QueueListeners;
class Consumer {
const bool acquires;
- public:
- typedef boost::shared_ptr<Consumer> shared_ptr;
-
+ // inListeners allows QueueListeners to efficiently track if this instance is registered
+ // for notifications without having to search its containers
+ bool inListeners;
+ // the name is generated by broker and is unique within broker scope. It is not
+ // provided or known by the remote Consumer.
+ const std::string name;
+ public:
+ typedef boost::shared_ptr<Consumer> shared_ptr;
+
framing::SequenceNumber position;
-
- Consumer(bool preAcquires = true) : acquires(preAcquires) {}
+
+ Consumer(const std::string& _name, bool preAcquires = true)
+ : acquires(preAcquires), inListeners(false), name(_name), position(0) {}
bool preAcquires() const { return acquires; }
+ const std::string& getName() const { return name; }
+
virtual bool deliver(QueuedMessage& msg) = 0;
virtual void notify() = 0;
virtual bool filter(boost::intrusive_ptr<Message>) { return true; }
virtual bool accept(boost::intrusive_ptr<Message>) { return true; }
virtual OwnershipToken* getSession() = 0;
virtual ~Consumer(){}
+ friend class QueueListeners;
};
}}
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/Daemon.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/Daemon.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/Daemon.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/Daemon.cpp Fri Oct 21 14:42:12 2011
@@ -93,11 +93,10 @@ void Daemon::fork()
catch (const exception& e) {
QPID_LOG(critical, "Unexpected error: " << e.what());
uint16_t port = 0;
- int unused_ret; //Supress warning about ignoring return value.
- unused_ret = write(pipeFds[1], &port, sizeof(uint16_t));
+ (void) write(pipeFds[1], &port, sizeof(uint16_t));
std::string pipeFailureMessage = e.what();
- unused_ret = write ( pipeFds[1],
+ (void) write ( pipeFds[1],
pipeFailureMessage.c_str(),
strlen(pipeFailureMessage.c_str())
);
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/DeliverableMessage.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/DeliverableMessage.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/DeliverableMessage.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/DeliverableMessage.h Fri Oct 21 14:42:12 2011
@@ -29,7 +29,7 @@
namespace qpid {
namespace broker {
- class DeliverableMessage : public Deliverable{
+ class QPID_BROKER_CLASS_EXTERN DeliverableMessage : public Deliverable{
boost::intrusive_ptr<Message> msg;
public:
QPID_BROKER_EXTERN DeliverableMessage(const boost::intrusive_ptr<Message>& msg);
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/DeliveryRecord.cpp Fri Oct 21 14:42:12 2011
@@ -75,7 +75,7 @@ void DeliveryRecord::deliver(framing::Fr
{
id = deliveryId;
if (msg.payload->getRedelivered()){
- msg.payload->getProperties<framing::DeliveryProperties>()->setRedelivered(true);
+ msg.payload->setRedelivered();
}
msg.payload->adjustTtl();
@@ -131,18 +131,20 @@ void DeliveryRecord::committed() const{
void DeliveryRecord::reject()
{
- Exchange::shared_ptr alternate = queue->getAlternateExchange();
- if (alternate) {
- DeliverableMessage delivery(msg.payload);
- alternate->route(delivery, msg.payload->getRoutingKey(), msg.payload->getApplicationHeaders());
- QPID_LOG(info, "Routed rejected message from " << queue->getName() << " to "
- << alternate->getName());
- } else {
- //just drop it
- QPID_LOG(info, "Dropping rejected message from " << queue->getName());
+ if (acquired && !ended) {
+ Exchange::shared_ptr alternate = queue->getAlternateExchange();
+ if (alternate) {
+ DeliverableMessage delivery(msg.payload);
+ alternate->routeWithAlternate(delivery);
+ QPID_LOG(info, "Routed rejected message from " << queue->getName() << " to "
+ << alternate->getName());
+ } else {
+ //just drop it
+ QPID_LOG(info, "Dropping rejected message from " << queue->getName());
+ }
+ dequeue();
+ setEnded();
}
-
- dequeue();
}
uint32_t DeliveryRecord::getCredit() const
@@ -151,7 +153,7 @@ uint32_t DeliveryRecord::getCredit() con
}
void DeliveryRecord::acquire(DeliveryIds& results) {
- if (queue->acquire(msg)) {
+ if (queue->acquire(msg, tag)) {
acquired = true;
results.push_back(id);
if (!acceptExpected) {
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/DeliveryRecord.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/DeliveryRecord.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/DeliveryRecord.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/DeliveryRecord.h Fri Oct 21 14:42:12 2011
@@ -46,7 +46,7 @@ class DeliveryRecord
{
QueuedMessage msg;
mutable boost::shared_ptr<Queue> queue;
- std::string tag;
+ std::string tag; // name of consumer
DeliveryId id;
bool acquired : 1;
bool acceptExpected : 1;
@@ -90,7 +90,7 @@ class DeliveryRecord
bool isAcquired() const { return acquired; }
bool isComplete() const { return completed; }
- bool isRedundant() const { return ended && (!windowing || completed); }
+ bool isRedundant() const { return ended && (!windowing || completed || cancelled); }
bool isCancelled() const { return cancelled; }
bool isAccepted() const { return !acceptExpected; }
bool isEnded() const { return ended; }
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/DirectExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/DirectExchange.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/DirectExchange.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/DirectExchange.cpp Fri Oct 21 14:42:12 2011
@@ -94,7 +94,7 @@ bool DirectExchange::bind(Queue::shared_
propagate = bk.fedBinding.delOrigin(queue->getName(), fedOrigin);
if (bk.fedBinding.countFedBindings(queue->getName()) == 0)
- unbind(queue, routingKey, 0);
+ unbind(queue, routingKey, args);
} else if (fedOp == fedOpReorigin) {
/** gather up all the keys that need rebinding in a local vector
@@ -124,20 +124,24 @@ bool DirectExchange::bind(Queue::shared_
return true;
}
-bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/)
+bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args)
{
+ string fedOrigin(args ? args->getAsString(qpidFedOrigin) : "");
bool propagate = false;
- QPID_LOG(debug, "Unbind key [" << routingKey << "] from queue " << queue->getName());
-
+ QPID_LOG(debug, "Unbinding key [" << routingKey << "] from queue " << queue->getName()
+ << " on exchange " << getName() << " origin=" << fedOrigin << ")" );
{
Mutex::ScopedLock l(lock);
BoundKey& bk = bindings[routingKey];
if (bk.queues.remove_if(MatchQueue(queue))) {
- propagate = bk.fedBinding.delOrigin();
+ propagate = bk.fedBinding.delOrigin(queue->getName(), fedOrigin);
if (mgmtExchange != 0) {
mgmtExchange->dec_bindingCount();
}
+ if (bk.queues.empty()) {
+ bindings.erase(routingKey);
+ }
} else {
return false;
}
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxAck.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxAck.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxAck.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxAck.cpp Fri Oct 21 14:42:12 2011
@@ -32,6 +32,10 @@ DtxAck::DtxAck(const qpid::framing::Sequ
not1(bind2nd(mem_fun_ref(&DeliveryRecord::coveredBy), &acked)));
}
+DtxAck::DtxAck(DeliveryRecords& unacked) {
+ pending = unacked;
+}
+
bool DtxAck::prepare(TransactionContext* ctxt) throw()
{
try{
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxAck.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxAck.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxAck.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxAck.h Fri Oct 21 14:42:12 2011
@@ -1,3 +1,6 @@
+#ifndef QPID_BROKER_DTXACK_H
+#define QPID_BROKER_DTXACK_H
+
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -18,9 +21,6 @@
* under the License.
*
*/
-#ifndef _DtxAck_
-#define _DtxAck_
-
#include <algorithm>
#include <functional>
#include <list>
@@ -29,20 +29,21 @@
#include "qpid/broker/TxOp.h"
namespace qpid {
- namespace broker {
- class DtxAck : public TxOp{
- DeliveryRecords pending;
+namespace broker {
+class DtxAck : public TxOp{
+ DeliveryRecords pending;
- public:
- DtxAck(const framing::SequenceSet& acked, DeliveryRecords& unacked);
- virtual bool prepare(TransactionContext* ctxt) throw();
- virtual void commit() throw();
- virtual void rollback() throw();
- virtual ~DtxAck(){}
- virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); }
- };
- }
-}
+ public:
+ DtxAck(const framing::SequenceSet& acked, DeliveryRecords& unacked);
+ DtxAck(DeliveryRecords& unacked);
+ virtual bool prepare(TransactionContext* ctxt) throw();
+ virtual void commit() throw();
+ virtual void rollback() throw();
+ virtual ~DtxAck(){}
+ virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); }
+ const DeliveryRecords& getPending() const { return pending; }
+};
+}} // qpid::broker
-#endif
+#endif /*!QPID_BROKER_DTXACK_H*/
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxBuffer.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxBuffer.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxBuffer.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxBuffer.cpp Fri Oct 21 14:42:12 2011
@@ -23,8 +23,11 @@
using namespace qpid::broker;
using qpid::sys::Mutex;
-DtxBuffer::DtxBuffer(const std::string& _xid)
- : xid(_xid), ended(false), suspended(false), failed(false), expired(false) {}
+DtxBuffer::DtxBuffer(
+ const std::string& _xid,
+ bool ended_, bool suspended_, bool failed_, bool expired_)
+ : xid(_xid), ended(ended_), suspended(suspended_), failed(failed_), expired(expired_)
+{}
DtxBuffer::~DtxBuffer() {}
@@ -34,7 +37,7 @@ void DtxBuffer::markEnded()
ended = true;
}
-bool DtxBuffer::isEnded()
+bool DtxBuffer::isEnded() const
{
Mutex::ScopedLock locker(lock);
return ended;
@@ -45,7 +48,7 @@ void DtxBuffer::setSuspended(bool isSusp
suspended = isSuspended;
}
-bool DtxBuffer::isSuspended()
+bool DtxBuffer::isSuspended() const
{
return suspended;
}
@@ -58,13 +61,13 @@ void DtxBuffer::fail()
ended = true;
}
-bool DtxBuffer::isRollbackOnly()
+bool DtxBuffer::isRollbackOnly() const
{
Mutex::ScopedLock locker(lock);
return failed;
}
-const std::string& DtxBuffer::getXid()
+std::string DtxBuffer::getXid() const
{
return xid;
}
@@ -76,8 +79,13 @@ void DtxBuffer::timedout()
fail();
}
-bool DtxBuffer::isExpired()
+bool DtxBuffer::isExpired() const
{
Mutex::ScopedLock locker(lock);
return expired;
}
+
+bool DtxBuffer::isFailed() const
+{
+ return failed;
+}
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxBuffer.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxBuffer.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxBuffer.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxBuffer.h Fri Oct 21 14:42:12 2011
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -26,31 +26,34 @@
#include "qpid/sys/Mutex.h"
namespace qpid {
- namespace broker {
- class DtxBuffer : public TxBuffer{
- sys::Mutex lock;
- const std::string xid;
- bool ended;
- bool suspended;
- bool failed;
- bool expired;
-
- public:
- typedef boost::shared_ptr<DtxBuffer> shared_ptr;
-
- QPID_BROKER_EXTERN DtxBuffer(const std::string& xid = "");
- QPID_BROKER_EXTERN ~DtxBuffer();
- QPID_BROKER_EXTERN void markEnded();
- bool isEnded();
- void setSuspended(bool suspended);
- bool isSuspended();
- void fail();
- bool isRollbackOnly();
- void timedout();
- bool isExpired();
- const std::string& getXid();
- };
- }
+namespace broker {
+class DtxBuffer : public TxBuffer{
+ mutable sys::Mutex lock;
+ const std::string xid;
+ bool ended;
+ bool suspended;
+ bool failed;
+ bool expired;
+
+ public:
+ typedef boost::shared_ptr<DtxBuffer> shared_ptr;
+
+ QPID_BROKER_EXTERN DtxBuffer(
+ const std::string& xid = "",
+ bool ended=false, bool suspended=false, bool failed=false, bool expired=false);
+ QPID_BROKER_EXTERN ~DtxBuffer();
+ QPID_BROKER_EXTERN void markEnded();
+ bool isEnded() const;
+ void setSuspended(bool suspended);
+ bool isSuspended() const;
+ void fail();
+ bool isRollbackOnly() const;
+ void timedout();
+ bool isExpired() const;
+ bool isFailed() const;
+ std::string getXid() const;
+};
+}
}
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxManager.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxManager.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxManager.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxManager.cpp Fri Oct 21 14:42:12 2011
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -34,7 +34,7 @@ using qpid::ptr_map_ptr;
using namespace qpid::broker;
using namespace qpid::framing;
-DtxManager::DtxManager(qpid::sys::Timer& t) : store(0), timer(t) {}
+DtxManager::DtxManager(qpid::sys::Timer& t) : store(0), timer(&t) {}
DtxManager::~DtxManager() {}
@@ -53,8 +53,8 @@ void DtxManager::recover(const std::stri
createWork(xid)->recover(txn, ops);
}
-bool DtxManager::prepare(const std::string& xid)
-{
+bool DtxManager::prepare(const std::string& xid)
+{
QPID_LOG(debug, "preparing: " << xid);
try {
return getWork(xid)->prepare();
@@ -64,8 +64,8 @@ bool DtxManager::prepare(const std::stri
}
}
-bool DtxManager::commit(const std::string& xid, bool onePhase)
-{
+bool DtxManager::commit(const std::string& xid, bool onePhase)
+{
QPID_LOG(debug, "committing: " << xid);
try {
bool result = getWork(xid)->commit(onePhase);
@@ -77,8 +77,8 @@ bool DtxManager::commit(const std::strin
}
}
-void DtxManager::rollback(const std::string& xid)
-{
+void DtxManager::rollback(const std::string& xid)
+{
QPID_LOG(debug, "rolling back: " << xid);
try {
getWork(xid)->rollback();
@@ -91,7 +91,7 @@ void DtxManager::rollback(const std::str
DtxWorkRecord* DtxManager::getWork(const std::string& xid)
{
- Mutex::ScopedLock locker(lock);
+ Mutex::ScopedLock locker(lock);
WorkMap::iterator i = work.find(xid);
if (i == work.end()) {
throw NotFoundException(QPID_MSG("Unrecognised xid " << xid));
@@ -99,9 +99,14 @@ DtxWorkRecord* DtxManager::getWork(const
return ptr_map_ptr(i);
}
+bool DtxManager::exists(const std::string& xid) {
+ Mutex::ScopedLock locker(lock);
+ return work.find(xid) != work.end();
+}
+
void DtxManager::remove(const std::string& xid)
{
- Mutex::ScopedLock locker(lock);
+ Mutex::ScopedLock locker(lock);
WorkMap::iterator i = work.find(xid);
if (i == work.end()) {
throw NotFoundException(QPID_MSG("Unrecognised xid " << xid));
@@ -110,14 +115,15 @@ void DtxManager::remove(const std::strin
}
}
-DtxWorkRecord* DtxManager::createWork(std::string xid)
+DtxWorkRecord* DtxManager::createWork(const std::string& xid)
{
- Mutex::ScopedLock locker(lock);
+ Mutex::ScopedLock locker(lock);
WorkMap::iterator i = work.find(xid);
if (i != work.end()) {
throw NotAllowedException(QPID_MSG("Xid " << xid << " is already known (use 'join' to add work to an existing xid)"));
} else {
- return ptr_map_ptr(work.insert(xid, new DtxWorkRecord(xid, store)).first);
+ std::string ncxid = xid; // Work around const correctness problems in ptr_map.
+ return ptr_map_ptr(work.insert(ncxid, new DtxWorkRecord(ncxid, store)).first);
}
}
@@ -131,7 +137,7 @@ void DtxManager::setTimeout(const std::s
}
timeout = intrusive_ptr<DtxTimeout>(new DtxTimeout(secs, *this, xid));
record->setTimeout(timeout);
- timer.add(timeout);
+ timer->add(timeout);
}
uint32_t DtxManager::getTimeout(const std::string& xid)
@@ -142,7 +148,7 @@ uint32_t DtxManager::getTimeout(const st
void DtxManager::timedout(const std::string& xid)
{
- Mutex::ScopedLock locker(lock);
+ Mutex::ScopedLock locker(lock);
WorkMap::iterator i = work.find(xid);
if (i == work.end()) {
QPID_LOG(warning, "Transaction timeout failed: no record for xid");
@@ -153,7 +159,7 @@ void DtxManager::timedout(const std::str
}
}
-DtxManager::DtxCleanup::DtxCleanup(uint32_t _timeout, DtxManager& _mgr, const std::string& _xid)
+DtxManager::DtxCleanup::DtxCleanup(uint32_t _timeout, DtxManager& _mgr, const std::string& _xid)
: TimerTask(qpid::sys::Duration(_timeout * qpid::sys::TIME_SEC),"DtxCleanup"), mgr(_mgr), xid(_xid) {}
void DtxManager::DtxCleanup::fire()
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxManager.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxManager.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxManager.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxManager.h Fri Oct 21 14:42:12 2011
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -26,8 +26,8 @@
#include "qpid/broker/DtxWorkRecord.h"
#include "qpid/broker/TransactionalStore.h"
#include "qpid/framing/amqp_types.h"
-#include "qpid/sys/Timer.h"
#include "qpid/sys/Mutex.h"
+#include "qpid/ptr_map.h"
namespace qpid {
namespace broker {
@@ -39,22 +39,21 @@ class DtxManager{
{
DtxManager& mgr;
const std::string& xid;
-
- DtxCleanup(uint32_t timeout, DtxManager& mgr, const std::string& xid);
+
+ DtxCleanup(uint32_t timeout, DtxManager& mgr, const std::string& xid);
void fire();
};
WorkMap work;
TransactionalStore* store;
qpid::sys::Mutex lock;
- qpid::sys::Timer& timer;
+ qpid::sys::Timer* timer;
void remove(const std::string& xid);
- DtxWorkRecord* getWork(const std::string& xid);
- DtxWorkRecord* createWork(std::string xid);
+ DtxWorkRecord* createWork(const std::string& xid);
public:
- DtxManager(qpid::sys::Timer&);
+ DtxManager(sys::Timer&);
~DtxManager();
void start(const std::string& xid, DtxBuffer::shared_ptr work);
void join(const std::string& xid, DtxBuffer::shared_ptr work);
@@ -66,6 +65,15 @@ public:
uint32_t getTimeout(const std::string& xid);
void timedout(const std::string& xid);
void setStore(TransactionalStore* store);
+ void setTimer(sys::Timer& t) { timer = &t; }
+
+ // Used by cluster for replication.
+ template<class F> void each(F f) const {
+ for (WorkMap::const_iterator i = work.begin(); i != work.end(); ++i)
+ f(*ptr_map_ptr(i));
+ }
+ DtxWorkRecord* getWork(const std::string& xid);
+ bool exists(const std::string& xid);
};
}
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxTimeout.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxTimeout.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxTimeout.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxTimeout.cpp Fri Oct 21 14:42:12 2011
@@ -25,7 +25,7 @@
using namespace qpid::broker;
DtxTimeout::DtxTimeout(uint32_t _timeout, DtxManager& _mgr, const std::string& _xid)
- : TimerTask(qpid::sys::Duration(_timeout * qpid::sys::TIME_SEC),"DtxTimeout"), timeout(_timeout), mgr(_mgr), xid(_xid)
+ : TimerTask(qpid::sys::Duration(_timeout * qpid::sys::TIME_SEC),"DtxTimeout-"+_xid), timeout(_timeout), mgr(_mgr), xid(_xid)
{
}
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxTimeout.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxTimeout.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxTimeout.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxTimeout.h Fri Oct 21 14:42:12 2011
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -29,7 +29,9 @@ namespace broker {
class DtxManager;
-struct DtxTimeoutException : public Exception {};
+struct DtxTimeoutException : public Exception {
+ DtxTimeoutException(const std::string& msg=std::string()) : Exception(msg) {}
+};
struct DtxTimeout : public sys::TimerTask
{
@@ -37,7 +39,7 @@ struct DtxTimeout : public sys::TimerTas
DtxManager& mgr;
const std::string xid;
- DtxTimeout(uint32_t timeout, DtxManager& mgr, const std::string& xid);
+ DtxTimeout(uint32_t timeout, DtxManager& mgr, const std::string& xid);
void fire();
};
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxWorkRecord.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxWorkRecord.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxWorkRecord.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxWorkRecord.cpp Fri Oct 21 14:42:12 2011
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -28,19 +28,19 @@ using qpid::sys::Mutex;
using namespace qpid::broker;
using namespace qpid::framing;
-DtxWorkRecord::DtxWorkRecord(const std::string& _xid, TransactionalStore* const _store) :
+DtxWorkRecord::DtxWorkRecord(const std::string& _xid, TransactionalStore* const _store) :
xid(_xid), store(_store), completed(false), rolledback(false), prepared(false), expired(false) {}
-DtxWorkRecord::~DtxWorkRecord()
+DtxWorkRecord::~DtxWorkRecord()
{
- if (timeout.get()) {
+ if (timeout.get()) {
timeout->cancel();
}
}
bool DtxWorkRecord::prepare()
{
- Mutex::ScopedLock locker(lock);
+ Mutex::ScopedLock locker(lock);
if (check()) {
txn = store->begin(xid);
if (prepare(txn.get())) {
@@ -68,7 +68,7 @@ bool DtxWorkRecord::prepare(TransactionC
bool DtxWorkRecord::commit(bool onePhase)
{
- Mutex::ScopedLock locker(lock);
+ Mutex::ScopedLock locker(lock);
if (check()) {
if (prepared) {
//already prepared i.e. 2pc
@@ -78,13 +78,13 @@ bool DtxWorkRecord::commit(bool onePhase
store->commit(*txn);
txn.reset();
-
+
std::for_each(work.begin(), work.end(), mem_fn(&TxBuffer::commit));
return true;
} else {
//1pc commit optimisation, don't need a 2pc transaction context:
if (!onePhase) {
- throw IllegalStateException(QPID_MSG("Branch with xid " << xid << " has not been prepared, one-phase option required!"));
+ throw IllegalStateException(QPID_MSG("Branch with xid " << xid << " has not been prepared, one-phase option required!"));
}
std::auto_ptr<TransactionContext> localtxn = store->begin();
if (prepare(localtxn.get())) {
@@ -107,16 +107,16 @@ bool DtxWorkRecord::commit(bool onePhase
void DtxWorkRecord::rollback()
{
- Mutex::ScopedLock locker(lock);
+ Mutex::ScopedLock locker(lock);
check();
abort();
}
void DtxWorkRecord::add(DtxBuffer::shared_ptr ops)
{
- Mutex::ScopedLock locker(lock);
+ Mutex::ScopedLock locker(lock);
if (expired) {
- throw DtxTimeoutException();
+ throw DtxTimeoutException(QPID_MSG("Branch with xid " << xid << " has timed out."));
}
if (completed) {
throw CommandInvalidException(QPID_MSG("Branch with xid " << xid << " has been completed!"));
@@ -163,7 +163,7 @@ void DtxWorkRecord::recover(std::auto_pt
void DtxWorkRecord::timedout()
{
- Mutex::ScopedLock locker(lock);
+ Mutex::ScopedLock locker(lock);
expired = true;
rolledback = true;
if (!completed) {
@@ -175,3 +175,17 @@ void DtxWorkRecord::timedout()
}
abort();
}
+
+size_t DtxWorkRecord::indexOf(const DtxBuffer::shared_ptr& buf) {
+ Work::iterator i = std::find(work.begin(), work.end(), buf);
+ if (i == work.end()) throw NotFoundException(
+ QPID_MSG("Can't find DTX buffer for xid: " << buf->getXid()));
+ return i - work.begin();
+}
+
+DtxBuffer::shared_ptr DtxWorkRecord::operator[](size_t i) const {
+ if (i > work.size())
+ throw NotFoundException(
+ QPID_MSG("Can't find DTX buffer " << i << " for xid: " << xid));
+ return work[i];
+}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org