You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/02/28 17:14:57 UTC
svn commit: r1451244 [6/45] - in /qpid/branches/asyncstore: ./ bin/ cpp/
cpp/bindings/ cpp/bindings/qmf/ cpp/bindings/qmf/python/
cpp/bindings/qmf/ruby/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/
cpp/bindings/qmf2/python/ cpp/bindings/qmf2/ruby...
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Broker.cpp?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Broker.cpp Thu Feb 28 16:14:30 2013
@@ -20,6 +20,8 @@
*/
#include "qpid/broker/Broker.h"
+
+#include "qpid/broker/AclModule.h"
#include "qpid/broker/AsyncResultHandle.h"
#include "qpid/broker/ConfigAsyncContext.h"
#include "qpid/broker/ConfigHandle.h"
@@ -27,9 +29,7 @@
#include "qpid/broker/DirectExchange.h"
#include "qpid/broker/FanOutExchange.h"
#include "qpid/broker/HeadersExchange.h"
-//#include "qpid/broker/MessageStoreModule.h"
-//#include "qpid/broker/NullMessageStore.h"
-//#include "qpid/broker/RecoveryAsyncContext.h"
+#include "qpid/broker/NameGenerator.h"
#include "qpid/broker/RecoveryManagerImpl.h"
#include "qpid/broker/SaslAuthenticator.h"
#include "qpid/broker/SecureConnectionFactory.h"
@@ -43,6 +43,7 @@
#include "qpid/broker/MessageGroupManager.h"
#include "qmf/org/apache/qpid/broker/Package.h"
+#include "qmf/org/apache/qpid/broker/ArgsBrokerConnect.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerCreate.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerDelete.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerQuery.h"
@@ -50,12 +51,12 @@
#include "qmf/org/apache/qpid/broker/ArgsBrokerGetLogLevel.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerQueueMoveMessages.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerSetLogLevel.h"
+#include "qmf/org/apache/qpid/broker/ArgsBrokerGetLogHiresTimestamp.h"
+#include "qmf/org/apache/qpid/broker/ArgsBrokerSetLogHiresTimestamp.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerSetTimestampConfig.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerGetTimestampConfig.h"
#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
-#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h"
-#include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
#include "qmf/org/apache/qpid/broker/EventBind.h"
#include "qmf/org/apache/qpid/broker/EventUnbind.h"
#include "qpid/amqp_0_10/Codecs.h"
@@ -75,6 +76,7 @@
#include "qpid/sys/Dispatcher.h"
#include "qpid/sys/Thread.h"
#include "qpid/sys/Time.h"
+#include "qpid/sys/Timer.h"
#include "qpid/sys/ConnectionInputHandler.h"
#include "qpid/sys/ConnectionInputHandlerFactory.h"
#include "qpid/sys/TimeoutHandler.h"
@@ -110,6 +112,17 @@ namespace _qmf = qmf::org::apache::qpid:
namespace qpid {
namespace broker {
+const std::string empty;
+const std::string amq_direct("amq.direct");
+const std::string amq_topic("amq.topic");
+const std::string amq_fanout("amq.fanout");
+const std::string amq_match("amq.match");
+const std::string qpid_management("qpid.management");
+const std::string knownHostsNone("none");
+
+// static
+Broker* Broker::thisBroker;
+
Broker::Options::Options(const std::string& name) :
qpid::Options(name),
noDataDir(0),
@@ -127,6 +140,7 @@ Broker::Options::Options(const std::stri
queueLimit(100*1048576/*100M default limit*/),
tcpNoDelay(false),
requireEncrypted(false),
+ knownHosts(knownHostsNone),
qmf2Support(true),
qmf1Support(true),
queueFlowStopRatio(80),
@@ -136,7 +150,7 @@ Broker::Options::Options(const std::stri
timestampRcvMsgs(false), // set the 0.10 timestamp delivery property
linkMaintenanceInterval(2),
linkHeartbeatInterval(120),
- maxNegotiateTime(2000) // 2s
+ maxNegotiateTime(10000) // 10s
{
int c = sys::SystemInfo::concurrency();
workerThreads=c+1;
@@ -152,6 +166,7 @@ Broker::Options::Options(const std::stri
("data-dir", optValue(dataDir,"DIR"), "Directory to contain persistent data generated by the broker")
("no-data-dir", optValue(noDataDir), "Don't use a data directory. No persistent configuration will be loaded or stored")
("port,p", optValue(port,"PORT"), "Tells the broker to listen on PORT")
+ ("interface", optValue(listenInterfaces, "<interface name>|<interface address>"), "Which network interfaces to use to listen for incoming connections")
("worker-threads", optValue(workerThreads, "N"), "Sets the broker thread pool size")
("connection-backlog", optValue(connectionBacklog, "N"), "Sets the connection backlog limit for the server socket")
("mgmt-enable,m", optValue(enableMgmt,"yes|no"), "Enable Management")
@@ -175,23 +190,27 @@ Broker::Options::Options(const std::stri
("default-event-threshold-ratio", optValue(queueThresholdEventRatio, "%age of limit"), "The ratio of any specified queue limit at which an event will be raised")
("default-message-group", optValue(defaultMsgGroup, "GROUP-IDENTIFER"), "Group identifier to assign to messages delivered to a message group queue that do not contain an identifier.")
("enable-timestamp", optValue(timestampRcvMsgs, "yes|no"), "Add current time to each received message.")
- ("link-maintenace-interval", optValue(linkMaintenanceInterval, "SECONDS"))
- ("link-heartbeat-interval", optValue(linkHeartbeatInterval, "SECONDS"))
- ("max-negotiate-time", optValue(maxNegotiateTime, "MilliSeconds"), "Maximum time a connection can take to send the initial protocol negotiation")
+ ("link-maintenance-interval", optValue(linkMaintenanceInterval, "SECONDS"),
+ "Interval to check link health and re-connect if need be")
+ ("link-heartbeat-interval", optValue(linkHeartbeatInterval, "SECONDS"),
+ "Heartbeat interval for a federation link")
+ ("max-negotiate-time", optValue(maxNegotiateTime, "MILLISECONDS"), "Maximum time a connection can take to send the initial protocol negotiation")
("federation-tag", optValue(fedTag, "NAME"), "Override the federation tag")
;
}
-const std::string empty;
-const std::string amq_direct("amq.direct");
-const std::string amq_topic("amq.topic");
-const std::string amq_fanout("amq.fanout");
-const std::string amq_match("amq.match");
-const std::string qpid_management("qpid.management");
-const std::string knownHostsNone("none");
+namespace {
+// Arguments to declare a non-replicated exchange.
+framing::FieldTable noReplicateArgs() {
+ framing::FieldTable args;
+ args.setString("qpid.replicate", "none");
+ return args;
+}
+}
Broker::Broker(const Broker::Options& conf) :
poller(new Poller),
+ timer(new qpid::sys::Timer),
config(conf),
managementAgent(conf.enableMgmt ? new ManagementAgent(conf.qmf1Support,
conf.qmf2Support)
@@ -205,21 +224,18 @@ Broker::Broker(const Broker::Options& co
exchanges(this),
links(this),
factory(new SecureConnectionFactory(*this)),
- dtxManager(timer),
+ dtxManager(*timer.get()),
sessionManager(
qpid::SessionState::Configuration(
conf.replayFlushLimit*1024, // convert kb to bytes.
conf.replayHardLimit*1024),
*this),
- mgmtObject(0),
- queueCleaner(queues, &timer),
- recovery(true),
- inCluster(false),
- clusterUpdatee(false),
+ queueCleaner(queues, timer.get()),
+ recoveryInProgress(false),
expiryPolicy(new ExpiryPolicy),
- getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this)),
- deferDelivery(boost::bind(&Broker::deferDeliveryImpl, this, _1, _2))
+ getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this))
{
+ thisBroker = this;
try {
if (conf.enableMgmt) {
QPID_LOG(info, "Management enabled");
@@ -231,7 +247,7 @@ Broker::Broker(const Broker::Options& co
System* system = new System (dataDir.isEnabled() ? dataDir.getPath() : string(), this);
systemObject = System::shared_ptr(system);
- mgmtObject = new _qmf::Broker(managementAgent.get(), this, system, "amqp-broker");
+ mgmtObject = _qmf::Broker::shared_ptr(new _qmf::Broker(managementAgent.get(), this, system, "amqp-broker"));
mgmtObject->set_systemRef(system->GetManagementObject()->getObjectId());
mgmtObject->set_port(conf.port);
mgmtObject->set_workerThreads(conf.workerThreads);
@@ -281,28 +297,26 @@ Broker::Broker(const Broker::Options& co
// if (NullMessageStore::isNullStore(store.get()))
// setStore();
- exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
+ framing::FieldTable args;
+
+ // Default exchnge is not replicated.
+ exchanges.declare(empty, DirectExchange::typeName, false, noReplicateArgs());
// if (store.get() != 0) {
if (asyncStore.get() != 0) {
- // The cluster plug-in will setRecovery(false) on all but the first
- // broker to join a cluster.
- if (getRecovery()) {
QPID_LOG(info, "Store recovery starting")
- RecoveryManagerImpl recoverer(queues, exchanges, links, dtxManager);
+ RecoveryManagerImpl recoverer(queues, exchanges, links, dtxManager, protocolRegistry);
RecoveryHandle rh = asyncStore->createRecoveryHandle();
- boost::shared_ptr<RecoveryAsyncContext> rac(new RecoveryAsyncContext(recoverer, &recoverComplete, &asyncResultQueue));
+ boost::shared_ptr<RecoveryAsyncContext> rac(new RecoveryAsyncContext(recoverer, &recoverCompleteCb, &asyncResultQueue));
asyncStore->submitRecover(rh, rac);
-// store->recover(recoverer);
- }
- else {
- QPID_LOG(notice, "Cluster recovery: recovered journal data discarded and journal files pushed down");
-// store->truncateInit(true); // save old files in subdir
- asyncStore->initialize(true, true);
- }
+// RecoveryManagerImpl recoverer(
+// queues, exchanges, links, dtxManager, protocolRegistry);
+// recoveryInProgress = true;
+// store->recover(recoverer);
+// recoveryInProgress = false;
}
// debug
- else QPID_LOG(info, ">>>> No store!!!!")
+// else QPID_LOG(info, ">>>> No store!!!!")
//ensure standard exchanges exist (done after recovery from store)
declareStandardExchange(amq_direct, DirectExchange::typeName);
@@ -311,7 +325,7 @@ Broker::Broker(const Broker::Options& co
declareStandardExchange(amq_match, HeadersExchange::typeName);
if(conf.enableMgmt) {
- exchanges.declare(qpid_management, ManagementTopicExchange::typeName);
+ exchanges.declare(qpid_management, ManagementTopicExchange::typeName, false, noReplicateArgs());
Exchange::shared_ptr mExchange = exchanges.get(qpid_management);
Exchange::shared_ptr dExchange = exchanges.get(amq_direct);
managementAgent->setExchange(mExchange, dExchange);
@@ -320,8 +334,10 @@ Broker::Broker(const Broker::Options& co
std::string qmfTopic("qmf.default.topic");
std::string qmfDirect("qmf.default.direct");
- std::pair<Exchange::shared_ptr, bool> topicPair(exchanges.declare(qmfTopic, ManagementTopicExchange::typeName));
- std::pair<Exchange::shared_ptr, bool> directPair(exchanges.declare(qmfDirect, ManagementDirectExchange::typeName));
+ std::pair<Exchange::shared_ptr, bool> topicPair(
+ exchanges.declare(qmfTopic, ManagementTopicExchange::typeName, false, noReplicateArgs()));
+ std::pair<Exchange::shared_ptr, bool> directPair(
+ exchanges.declare(qmfDirect, ManagementDirectExchange::typeName, false, noReplicateArgs()));
boost::dynamic_pointer_cast<ManagementDirectExchange>(directPair.first)->setManagmentAgent(managementAgent.get(), 2);
boost::dynamic_pointer_cast<ManagementTopicExchange>(topicPair.first)->setManagmentAgent(managementAgent.get(), 2);
@@ -349,19 +365,17 @@ Broker::Broker(const Broker::Options& co
// Initialize plugins
Plugin::initializeAll(*this);
- if (managementAgent.get()) managementAgent->pluginsInitialized();
+ if(conf.enableMgmt) {
+ if (getAcl()) {
+ mgmtObject->set_maxConns(getAcl()->getMaxConnectTotal());
+ }
+ }
if (conf.queueCleanInterval) {
queueCleaner.start(conf.queueCleanInterval * qpid::sys::TIME_SEC);
}
- //initialize known broker urls (TODO: add support for urls for other transports (SSL, RDMA)):
- if (conf.knownHosts.empty()) {
- boost::shared_ptr<ProtocolFactory> factory = getProtocolFactory(TCP_TRANSPORT);
- if (factory) {
- knownBrokers.push_back ( qpid::Url::getIpAddressesUrl ( factory->getPort() ) );
- }
- } else if (conf.knownHosts != knownHostsNone) {
+ if (!conf.knownHosts.empty() && conf.knownHosts != knownHostsNone) {
knownBrokers.push_back(Url(conf.knownHosts));
}
@@ -375,11 +389,14 @@ void Broker::declareStandardExchange(con
{
// bool storeEnabled = store.get() != NULL;
bool storeEnabled = asyncStore.get() != NULL;
- std::pair<Exchange::shared_ptr, bool> status = exchanges.declare(name, type, storeEnabled);
+ framing::FieldTable args;
+ // Standard exchanges are not replicated.
+ std::pair<Exchange::shared_ptr, bool> status =
+ exchanges.declare(name, type, storeEnabled, noReplicateArgs());
if (status.second && storeEnabled) {
// store->create(*status.first, framing::FieldTable ());
ConfigHandle ch = asyncStore->createConfigHandle();
- boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureComplete, &asyncResultQueue));
+ boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureCompleteCb, &asyncResultQueue));
asyncStore->submitCreate(ch, status.first.get(), bc);
}
}
@@ -420,11 +437,20 @@ void Broker::setStore () {
}
// static
+void Broker::recoverCompleteCb(const AsyncResultHandle* const arh) {
+ thisBroker->recoverComplete(arh);
+}
+
void Broker::recoverComplete(const AsyncResultHandle* const arh) {
+ recoveryInProgress = false;
std::cout << "@@@@ Broker: Recover complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl;
}
// static
+void Broker::configureCompleteCb(const AsyncResultHandle* const arh) {
+ thisBroker->configureComplete(arh);
+}
+
void Broker::configureComplete(const AsyncResultHandle* const arh) {
std::cout << "@@@@ Broker: Configure complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl;
}
@@ -464,13 +490,13 @@ Broker::~Broker() {
finalize(); // Finalize any plugins.
if (config.auth)
SaslAuthenticator::fini();
- timer.stop();
+ timer->stop();
QPID_LOG(notice, "Shut down");
}
-ManagementObject* Broker::GetManagementObject(void) const
+ManagementObject::shared_ptr Broker::GetManagementObject(void) const
{
- return (ManagementObject*) mgmtObject;
+ return mgmtObject;
}
Manageable* Broker::GetVhostObject(void) const
@@ -536,7 +562,7 @@ Manageable::status_t Broker::ManagementM
_qmf::ArgsBrokerQueueMoveMessages& moveArgs=
dynamic_cast<_qmf::ArgsBrokerQueueMoveMessages&>(args);
QPID_LOG (debug, "Broker::queueMoveMessages()");
- if (queueMoveMessages(moveArgs.i_srcQueue, moveArgs.i_destQueue, moveArgs.i_qty, moveArgs.i_filter))
+ if (queueMoveMessages(moveArgs.i_srcQueue, moveArgs.i_destQueue, moveArgs.i_qty, moveArgs.i_filter) >= 0)
status = Manageable::STATUS_OK;
else
return Manageable::STATUS_PARAMETER_INVALID;
@@ -584,7 +610,22 @@ Manageable::status_t Broker::ManagementM
status = setTimestampConfig(a.i_receive, getManagementExecutionContext());
break;
}
- default:
+
+ case _qmf::Broker::METHOD_GETLOGHIRESTIMESTAMP:
+ {
+ dynamic_cast<_qmf::ArgsBrokerGetLogHiresTimestamp&>(args).o_logHires = getLogHiresTimestamp();
+ QPID_LOG (debug, "Broker::getLogHiresTimestamp()");
+ status = Manageable::STATUS_OK;
+ break;
+ }
+ case _qmf::Broker::METHOD_SETLOGHIRESTIMESTAMP:
+ {
+ setLogHiresTimestamp(dynamic_cast<_qmf::ArgsBrokerSetLogHiresTimestamp&>(args).i_logHires);
+ QPID_LOG (debug, "Broker::setLogHiresTimestamp()");
+ status = Manageable::STATUS_OK;
+ break;
+ }
+ default:
QPID_LOG (debug, "Broker ManagementMethod not implemented: id=" << methodId << "]");
status = Manageable::STATUS_NOT_IMPLEMENTED;
break;
@@ -750,7 +791,7 @@ void Broker::createObject(const std::str
else extensions[i->first] = i->second;
}
framing::FieldTable arguments;
- amqp_0_10::translate(extensions, arguments);
+ qpid::amqp_0_10::translate(extensions, arguments);
try {
std::pair<boost::shared_ptr<Exchange>, bool> result =
@@ -772,7 +813,7 @@ void Broker::createObject(const std::str
else extensions[i->first] = i->second;
}
framing::FieldTable arguments;
- amqp_0_10::translate(extensions, arguments);
+ qpid::amqp_0_10::translate(extensions, arguments);
bind(binding.queue, binding.exchange, binding.key, arguments, userId, connectionId);
@@ -1008,6 +1049,18 @@ std::string Broker::getLogLevel()
return level;
}
+void Broker::setLogHiresTimestamp(bool enabled)
+{
+ QPID_LOG(notice, "Changing log hires timestamp to " << enabled);
+ qpid::log::Logger::instance().setHiresTimestamp(enabled);
+}
+
+bool Broker::getLogHiresTimestamp()
+{
+ return qpid::log::Logger::instance().getHiresTimestamp();
+}
+
+
boost::shared_ptr<ProtocolFactory> Broker::getProtocolFactory(const std::string& name) const {
ProtocolFactoryMap::const_iterator i
= name.empty() ? protocolFactories.begin() : protocolFactories.find(name);
@@ -1036,39 +1089,29 @@ void Broker::accept() {
}
void Broker::connect(
+ const std::string& name,
const std::string& host, const std::string& port, const std::string& transport,
- boost::function2<void, int, std::string> failed,
- sys::ConnectionCodec::Factory* f)
+ boost::function2<void, int, std::string> failed)
{
boost::shared_ptr<ProtocolFactory> pf = getProtocolFactory(transport);
- if (pf) pf->connect(poller, host, port, f ? f : factory.get(), failed);
+ if (pf) pf->connect(poller, name, host, port, factory.get(), failed);
else throw NoSuchTransportException(QPID_MSG("Unsupported transport type: " << transport));
}
-void Broker::connect(
- const Url& url,
- boost::function2<void, int, std::string> failed,
- sys::ConnectionCodec::Factory* f)
-{
- url.throwIfEmpty();
- const Address& addr=url[0];
- connect(addr.host, boost::lexical_cast<std::string>(addr.port), addr.protocol, failed, f);
-}
-
-uint32_t Broker::queueMoveMessages(
+int32_t Broker::queueMoveMessages(
const std::string& srcQueue,
const std::string& destQueue,
uint32_t qty,
const Variant::Map& filter)
{
- Queue::shared_ptr src_queue = queues.find(srcQueue);
- if (!src_queue)
- return 0;
- Queue::shared_ptr dest_queue = queues.find(destQueue);
- if (!dest_queue)
- return 0;
+ Queue::shared_ptr src_queue = queues.find(srcQueue);
+ if (!src_queue)
+ return -1;
+ Queue::shared_ptr dest_queue = queues.find(destQueue);
+ if (!dest_queue)
+ return -1;
- return src_queue->move(dest_queue, qty, &filter);
+ return (int32_t) src_queue->move(dest_queue, qty, &filter);
}
@@ -1083,12 +1126,6 @@ Broker::getKnownBrokersImpl()
bool Broker::deferDeliveryImpl(const std::string&, const Message&)
{ return false; }
-void Broker::setClusterTimer(std::auto_ptr<sys::Timer> t) {
- clusterTimer = t;
- queueCleaner.setTimer(clusterTimer.get());
- dtxManager.setTimer(*clusterTimer.get());
-}
-
const std::string Broker::TCP_TRANSPORT("tcp");
@@ -1109,9 +1146,14 @@ std::pair<boost::shared_ptr<Queue>, bool
params.insert(make_pair(acl::PROP_POLICYTYPE, settings.dropMessagesAtLimit ? "ring" : "reject"));
params.insert(make_pair(acl::PROP_MAXQUEUECOUNT, boost::lexical_cast<string>(settings.maxDepth.getCount())));
params.insert(make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast<string>(settings.maxDepth.getSize())));
+ params.insert(make_pair(acl::PROP_MAXFILECOUNT, boost::lexical_cast<string>(settings.maxFileCount)));
+ params.insert(make_pair(acl::PROP_MAXFILESIZE, boost::lexical_cast<string>(settings.maxFileSize)));
if (!acl->authorise(userId,acl::ACT_CREATE,acl::OBJ_QUEUE,name,¶ms) )
throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue create request from " << userId));
+
+ if (!acl->approveCreateQueue(userId,name) )
+ throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue create request from " << userId));
}
Exchange::shared_ptr alternate;
@@ -1120,21 +1162,12 @@ std::pair<boost::shared_ptr<Queue>, bool
if (!alternate) throw framing::NotFoundException(QPID_MSG("Alternate exchange does not exist: " << alternateExchange));
}
- std::pair<Queue::shared_ptr, bool> result = queues.declare(name, settings, alternate);
+ std::pair<Queue::shared_ptr, bool> result =
+ queues.declare(name, settings, alternate, false/*recovering*/,
+ owner, connectionId, userId);
if (result.second) {
//add default binding:
result.first->bind(exchanges.getDefault(), name, qpid::framing::FieldTable());
-
- if (managementAgent.get()) {
- //TODO: debatable whether we should raise an event here for
- //create when this is a 'declare' event; ideally add a create
- //event instead?
- managementAgent->raiseEvent(
- _qmf::EventQueueDeclare(connectionId, userId, name,
- settings.durable, owner, settings.autodelete, alternateExchange,
- settings.asMap(),
- "created"));
- }
QPID_LOG_CAT(debug, model, "Create queue. name:" << name
<< " user:" << userId
<< " rhost:" << connectionId
@@ -1149,6 +1182,10 @@ std::pair<boost::shared_ptr<Queue>, bool
void Broker::deleteQueue(const std::string& name, const std::string& userId,
const std::string& connectionId, QueueFunctor check)
{
+ QPID_LOG_CAT(debug, model, "Deleting queue. name:" << name
+ << " user:" << userId
+ << " rhost:" << connectionId
+ );
if (acl && !acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_QUEUE,name,NULL)) {
throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue delete request from " << userId));
}
@@ -1156,19 +1193,13 @@ void Broker::deleteQueue(const std::stri
Queue::shared_ptr queue = queues.find(name);
if (queue) {
if (check) check(queue);
- queues.destroy(name);
+ if (acl)
+ acl->recordDestroyQueue(name);
+ queues.destroy(name, connectionId, userId);
queue->destroyed();
} else {
throw framing::NotFoundException(QPID_MSG("Delete failed. No such queue: " << name));
}
-
- if (managementAgent.get())
- managementAgent->raiseEvent(_qmf::EventQueueDelete(connectionId, userId, name));
- QPID_LOG_CAT(debug, model, "Delete queue. name:" << name
- << " user:" << userId
- << " rhost:" << connectionId
- );
-
}
std::pair<Exchange::shared_ptr, bool> Broker::createExchange(
@@ -1196,33 +1227,16 @@ std::pair<Exchange::shared_ptr, bool> Br
}
std::pair<Exchange::shared_ptr, bool> result;
- result = exchanges.declare(name, type, durable, arguments);
+ result = exchanges.declare(
+ name, type, durable, arguments, alternate, connectionId, userId);
if (result.second) {
- if (alternate) {
- result.first->setAlternate(alternate);
- alternate->incAlternateUsers();
- }
if (durable) {
// store->create(*result.first, arguments);
ConfigHandle ch = asyncStore->createConfigHandle();
result.first->setHandle(ch);
- boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureComplete, &asyncResultQueue));
+ boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureCompleteCb, &asyncResultQueue));
asyncStore->submitCreate(ch, result.first.get(), bc);
}
- if (managementAgent.get()) {
- //TODO: debatable whether we should raise an event here for
- //create when this is a 'declare' event; ideally add a create
- //event instead?
- managementAgent->raiseEvent(_qmf::EventExchangeDeclare(connectionId,
- userId,
- name,
- type,
- alternateExchange,
- durable,
- false,
- ManagementAgent::toMap(arguments),
- "created"));
- }
QPID_LOG_CAT(debug, model, "Create exchange. name:" << name
<< " user:" << userId
<< " rhost:" << connectionId
@@ -1236,6 +1250,9 @@ std::pair<Exchange::shared_ptr, bool> Br
void Broker::deleteExchange(const std::string& name, const std::string& userId,
const std::string& connectionId)
{
+ QPID_LOG_CAT(debug, model, "Deleting exchange. name:" << name
+ << " user:" << userId
+ << " rhost:" << connectionId);
if (acl) {
if (!acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_EXCHANGE,name,NULL) )
throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange delete request from " << userId));
@@ -1246,21 +1263,15 @@ void Broker::deleteExchange(const std::s
}
Exchange::shared_ptr exchange(exchanges.get(name));
if (!exchange) throw framing::NotFoundException(QPID_MSG("Delete failed. No such exchange: " << name));
- if (exchange->inUseAsAlternate()) throw framing::NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange."));
+ if (exchange->inUseAsAlternate()) throw framing::NotAllowedException(QPID_MSG("Cannot delete " << name <<", in use as alternate-exchange."));
// if (exchange->isDurable()) store->destroy(*exchange);
if (exchange->isDurable()) {
- boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureComplete, &asyncResultQueue));
+ boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureCompleteCb, &asyncResultQueue));
asyncStore->submitDestroy(exchange->getHandle(), bc);
exchange->resetHandle();
}
if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers();
- exchanges.destroy(name);
-
- if (managementAgent.get())
- managementAgent->raiseEvent(_qmf::EventExchangeDelete(connectionId, userId, name));
- QPID_LOG_CAT(debug, model, "Delete exchange. name:" << name
- << " user:" << userId
- << " rhost:" << connectionId);
+ exchanges.destroy(name, connectionId, userId);
}
void Broker::bind(const std::string& queueName,
@@ -1298,6 +1309,7 @@ void Broker::bind(const std::string& que
QPID_LOG_CAT(debug, model, "Create binding. exchange:" << exchangeName
<< " queue:" << queueName
<< " key:" << key
+ << " arguments:" << arguments
<< " user:" << userId
<< " rhost:" << connectionId);
}
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Broker.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Broker.h Thu Feb 28 16:14:30 2013
@@ -25,40 +25,30 @@
#include "qpid/broker/AsyncResultQueueImpl.h"
#include "qpid/broker/AsyncStore.h"
#include "qpid/broker/BrokerImportExport.h"
-#include "qpid/broker/ConnectionFactory.h"
-#include "qpid/broker/ConnectionToken.h"
-#include "qpid/broker/DirectExchange.h"
+
+#include "qpid/DataDir.h"
+#include "qpid/Options.h"
+#include "qpid/Plugin.h"
#include "qpid/broker/DtxManager.h"
#include "qpid/broker/ExchangeRegistry.h"
//#include "qpid/broker/MessageStore.h"
+#include "qpid/broker/Protocol.h"
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/LinkRegistry.h"
#include "qpid/broker/SessionManager.h"
#include "qpid/broker/QueueCleaner.h"
#include "qpid/broker/Vhost.h"
#include "qpid/broker/System.h"
-#include "qpid/broker/ExpiryPolicy.h"
#include "qpid/broker/ConsumerFactory.h"
#include "qpid/broker/ConnectionObservers.h"
#include "qpid/broker/ConfigurationObservers.h"
#include "qpid/management/Manageable.h"
-#include "qpid/management/ManagementAgent.h"
-#include "qmf/org/apache/qpid/broker/Broker.h"
-#include "qmf/org/apache/qpid/broker/ArgsBrokerConnect.h"
-#include "qpid/Options.h"
-#include "qpid/Plugin.h"
-#include "qpid/DataDir.h"
-#include "qpid/framing/FrameHandler.h"
-#include "qpid/framing/OutputHandler.h"
-#include "qpid/framing/ProtocolInitiation.h"
-#include "qpid/sys/Runnable.h"
-#include "qpid/sys/Timer.h"
-#include "qpid/types/Variant.h"
-#include "qpid/RefCounted.h"
-#include "qpid/broker/AclModule.h"
+#include "qpid/sys/ConnectionCodec.h"
#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Runnable.h"
#include <boost/intrusive_ptr.hpp>
+
#include <string>
#include <vector>
@@ -67,12 +57,14 @@ namespace qpid {
namespace sys {
class ProtocolFactory;
class Poller;
+class Timer;
}
struct Url;
namespace broker {
+class AclModule;
class ConnectionState;
class ExpiryPolicy;
class Message;
@@ -103,6 +95,7 @@ class Broker : public sys::Runnable, pub
bool noDataDir;
std::string dataDir;
uint16_t port;
+ std::vector<std::string> listenInterfaces;
int workerThreads;
int connectionBacklog;
bool enableMgmt;
@@ -139,10 +132,15 @@ class Broker : public sys::Runnable, pub
void declareStandardExchange(const std::string& name, const std::string& type);
void setStore ();
- static void recoverComplete(const AsyncResultHandle* const);
- static void configureComplete(const AsyncResultHandle* const);
+ static void recoverCompleteCb(const AsyncResultHandle* const);
+ static void configureCompleteCb(const AsyncResultHandle* const);
+ static Broker* thisBroker;
+ void recoverComplete(const AsyncResultHandle* const arh);
+ void configureComplete(const AsyncResultHandle* const arh);
void setLogLevel(const std::string& level);
std::string getLogLevel();
+ void setLogHiresTimestamp(bool enabled);
+ bool getLogHiresTimestamp();
void createObject(const std::string& type, const std::string& name,
const qpid::types::Variant::Map& properties, bool strict, const ConnectionState* context);
void deleteObject(const std::string& type, const std::string& name,
@@ -158,8 +156,7 @@ class Broker : public sys::Runnable, pub
Manageable::status_t setTimestampConfig(const bool receive,
const ConnectionState* context);
boost::shared_ptr<sys::Poller> poller;
- sys::Timer timer;
- std::auto_ptr<sys::Timer> clusterTimer;
+ std::auto_ptr<sys::Timer> timer;
Options config;
std::auto_ptr<management::ManagementAgent> managementAgent;
ProtocolFactoryMap protocolFactories;
@@ -178,7 +175,7 @@ class Broker : public sys::Runnable, pub
boost::shared_ptr<sys::ConnectionCodec::Factory> factory;
DtxManager dtxManager;
SessionManager sessionManager;
- qmf::org::apache::qpid::broker::Broker* mgmtObject;
+ qmf::org::apache::qpid::broker::Broker::shared_ptr mgmtObject;
Vhost::shared_ptr vhostObject;
System::shared_ptr systemObject;
QueueCleaner queueCleaner;
@@ -187,10 +184,10 @@ class Broker : public sys::Runnable, pub
bool deferDeliveryImpl(const std::string& queue,
const Message& msg);
std::string federationTag;
- bool recovery;
- bool inCluster, clusterUpdatee;
+ bool recoveryInProgress;
boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
ConsumerFactories consumerFactories;
+ ProtocolRegistry protocolRegistry;
mutable sys::Mutex linkClientPropertiesLock;
framing::FieldTable linkClientProperties;
@@ -233,6 +230,7 @@ class Broker : public sys::Runnable, pub
DataDir& getDataDir() { return dataDir; }
Options& getOptions() { return config; }
AsyncResultQueueImpl& getAsyncResultQueue() { return asyncResultQueue; }
+ ProtocolRegistry& getProtocolRegistry() { return protocolRegistry; }
void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) { expiryPolicy = e; }
boost::intrusive_ptr<ExpiryPolicy> getExpiryPolicy() { return expiryPolicy; }
@@ -240,7 +238,7 @@ class Broker : public sys::Runnable, pub
SessionManager& getSessionManager() { return sessionManager; }
const std::string& getFederationTag() const { return federationTag; }
- QPID_BROKER_EXTERN management::ManagementObject* GetManagementObject() const;
+ QPID_BROKER_EXTERN management::ManagementObject::shared_ptr GetManagementObject() const;
QPID_BROKER_EXTERN management::Manageable* GetVhostObject() const;
QPID_BROKER_EXTERN management::Manageable::status_t ManagementMethod(
uint32_t methodId, management::Args& args, std::string& text);
@@ -253,19 +251,17 @@ class Broker : public sys::Runnable, pub
QPID_BROKER_EXTERN void accept();
/** Create a connection to another broker. */
- void connect(const std::string& host, const std::string& port,
+ void connect(const std::string& name,
+ const std::string& host, const std::string& port,
const std::string& transport,
- boost::function2<void, int, std::string> failed,
- sys::ConnectionCodec::Factory* =0);
- /** Create a connection to another broker. */
- void connect(const Url& url,
- boost::function2<void, int, std::string> failed,
- sys::ConnectionCodec::Factory* =0);
+ boost::function2<void, int, std::string> failed);
/** Move messages from one queue to another.
A zero quantity means to move all messages
+ Return -1 if one of the queues does not exist, otherwise
+ the number of messages moved.
*/
- QPID_BROKER_EXTERN uint32_t queueMoveMessages(
+ QPID_BROKER_EXTERN int32_t queueMoveMessages(
const std::string& srcQueue,
const std::string& destQueue,
uint32_t qty,
@@ -277,46 +273,17 @@ class Broker : public sys::Runnable, pub
/** Expose poller so plugins can register their descriptors. */
QPID_BROKER_EXTERN boost::shared_ptr<sys::Poller> getPoller();
- boost::shared_ptr<sys::ConnectionCodec::Factory> getConnectionFactory() { return factory; }
- void setConnectionFactory(boost::shared_ptr<sys::ConnectionCodec::Factory> f) { factory = f; }
-
/** Timer for local tasks affecting only this broker */
- sys::Timer& getTimer() { return timer; }
-
- /** Timer for tasks that must be synchronized if we are in a cluster */
- sys::Timer& getClusterTimer() { return clusterTimer.get() ? *clusterTimer : timer; }
- QPID_BROKER_EXTERN void setClusterTimer(std::auto_ptr<sys::Timer>);
+ sys::Timer& getTimer() { return *timer; }
boost::function<std::vector<Url> ()> getKnownBrokers;
static QPID_BROKER_EXTERN const std::string TCP_TRANSPORT;
- void setRecovery(bool set) { recovery = set; }
- bool getRecovery() const { return recovery; }
-
- /** True of this broker is part of a cluster.
- * Only valid after early initialization of plugins is complete.
- */
- bool isInCluster() const { return inCluster; }
- void setInCluster(bool set) { inCluster = set; }
-
- /** True if this broker is joining a cluster and in the process of
- * receiving a state update.
- */
- bool isClusterUpdatee() const { return clusterUpdatee; }
- void setClusterUpdatee(bool set) { clusterUpdatee = set; }
+ bool inRecovery() const { return recoveryInProgress; }
management::ManagementAgent* getManagementAgent() { return managementAgent.get(); }
- /**
- * Never true in a stand-alone broker. In a cluster, return true
- * to defer delivery of messages deliveredg in a cluster-unsafe
- * context.
- *@return true if delivery of a message should be deferred.
- */
- boost::function<bool (const std::string& queue,
- const Message& msg)> deferDelivery;
-
bool isAuthenticating ( ) { return config.auth; }
bool isTimestamping() { return config.timestampRcvMsgs; }
@@ -371,8 +338,10 @@ class Broker : public sys::Runnable, pub
QPID_BROKER_EXTERN framing::FieldTable getLinkClientProperties() const;
QPID_BROKER_EXTERN void setLinkClientProperties(const framing::FieldTable&);
+ QPID_BROKER_EXTERN uint16_t getLinkHearbeatInterval() { return config.linkHeartbeatInterval; }
/** Information identifying this system */
boost::shared_ptr<const System> getSystem() const { return systemObject; }
+ friend class StatusCheckThread;
};
}}
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/ConfigurationObserver.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/ConfigurationObserver.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/ConfigurationObserver.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/ConfigurationObserver.h Thu Feb 28 16:14:30 2013
@@ -38,6 +38,10 @@ class Exchange;
/**
* Observer for changes to configuration (aka wiring)
+ *
+ * NOTE: create and destroy functions are called with
+ * the registry lock held. This is necessary to ensure
+ * they are called in the correct sequence.
*/
class ConfigurationObserver
{
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Connection.cpp?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Connection.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Connection.cpp Thu Feb 28 16:14:30 2013
@@ -25,9 +25,9 @@
#include "qpid/broker/Bridge.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Queue.h"
-#include "qpid/broker/AclModule.h"
+#include "qpid/management/ManagementAgent.h"
#include "qpid/sys/SecuritySettings.h"
-#include "qpid/sys/ClusterSafe.h"
+#include "qpid/sys/Timer.h"
#include "qpid/log/Statement.h"
#include "qpid/ptr_map.h"
@@ -86,20 +86,14 @@ Connection::Connection(ConnectionOutputH
std::string& mgmtId_,
const qpid::sys::SecuritySettings& external,
bool link_,
- uint64_t objectId_,
- bool shadow_,
- bool delayManagement,
- bool authenticated_
+ uint64_t objectId_
) :
ConnectionState(out_, broker_),
securitySettings(external),
- shadow(shadow_),
- authenticated(authenticated_),
adapter(*this, link_),
link(link_),
mgmtClosing(false),
mgmtId(mgmtId_),
- mgmtObject(0),
links(broker_.getLinks()),
agent(0),
timer(broker_.getTimer()),
@@ -109,11 +103,6 @@ Connection::Connection(ConnectionOutputH
{
outboundTracker.wrap(out);
broker.getConnectionObservers().connection(*this);
- // In a cluster, allow adding the management object to be delayed.
- if (!delayManagement) addManagementObject();
-}
-
-void Connection::addManagementObject() {
assert(agent == 0);
assert(mgmtObject == 0);
Manageable* parent = broker.GetVhostObject();
@@ -121,8 +110,7 @@ void Connection::addManagementObject() {
agent = broker.getManagementAgent();
if (agent != 0) {
// TODO set last bool true if system connection
- mgmtObject = new _qmf::Connection(agent, this, parent, mgmtId, !link, false);
- mgmtObject->set_shadow(shadow);
+ mgmtObject = _qmf::Connection::shared_ptr(new _qmf::Connection(agent, this, parent, mgmtId, !link, false, "AMQP 0-10"));
agent->addObject(mgmtObject, objectId);
}
ConnectionState::setUrl(mgmtId);
@@ -139,13 +127,11 @@ void Connection::requestIOProcessing(boo
Connection::~Connection()
{
if (mgmtObject != 0) {
- mgmtObject->resourceDestroy();
- // In a cluster, Connections destroyed during shutdown are in
- // a cluster-unsafe context. Don't raise an event in that case.
- if (!link && isClusterSafe())
- agent->raiseEvent(_qmf::EventClientDisconnect(mgmtId, ConnectionState::getUserId()));
+ if (!link)
+ agent->raiseEvent(_qmf::EventClientDisconnect(mgmtId, ConnectionState::getUserId(), mgmtObject->get_remoteProperties()));
QPID_LOG_CAT(debug, model, "Delete connection. user:" << ConnectionState::getUserId()
<< " rhost:" << mgmtId );
+ mgmtObject->resourceDestroy();
}
broker.getConnectionObservers().closed(*this);
@@ -188,8 +174,7 @@ bool isMessage(const AMQMethodBody* meth
void Connection::recordFromServer(const framing::AMQFrame& frame)
{
- // Don't record management stats in cluster-unsafe contexts
- if (mgmtObject != 0 && isClusterSafe())
+ if (mgmtObject != 0)
{
qmf::org::apache::qpid::broker::Connection::PerThreadStats *cStats = mgmtObject->getStatistics();
cStats->framesToClient += 1;
@@ -203,8 +188,7 @@ void Connection::recordFromServer(const
void Connection::recordFromClient(const framing::AMQFrame& frame)
{
- // Don't record management stats in cluster-unsafe contexts
- if (mgmtObject != 0 && isClusterSafe())
+ if (mgmtObject != 0)
{
qmf::org::apache::qpid::broker::Connection::PerThreadStats *cStats = mgmtObject->getStatistics();
cStats->framesFromClient += 1;
@@ -279,28 +263,7 @@ void Connection::notifyConnectionForced(
void Connection::setUserId(const string& userId)
{
- // Account for changing userId
- AclModule* acl = broker.getAcl();
- if (acl)
- {
- acl->setUserId(*this, userId);
- }
-
ConnectionState::setUserId(userId);
- // In a cluster, the cluster code will raise the connect event
- // when the connection is replicated to the cluster.
- if (!broker.isInCluster()) raiseConnectEvent();
-}
-
-void Connection::raiseConnectEvent() {
- if (mgmtObject != 0) {
- mgmtObject->set_authIdentity(userId);
- agent->raiseEvent(_qmf::EventClientConnect(mgmtId, userId));
- }
-
- QPID_LOG_CAT(debug, model, "Create connection. user:" << userId
- << " rhost:" << mgmtId );
-
}
void Connection::setUserProxyAuth(bool b)
@@ -327,19 +290,6 @@ void Connection::close(connection::Close
getOutput().close();
}
-// Send a close to the client but keep the channels. Used by cluster.
-void Connection::sendClose() {
- if (heartbeatTimer)
- heartbeatTimer->cancel();
- if (timeoutTimer)
- timeoutTimer->cancel();
- if (linkHeartbeatTimer) {
- linkHeartbeatTimer->cancel();
- }
- adapter.close(connection::CLOSE_CODE_NORMAL, "OK");
- getOutput().close();
-}
-
void Connection::idleOut(){}
void Connection::idleIn(){}
@@ -364,9 +314,6 @@ void Connection::closed(){ // Physically
void Connection::doIoCallbacks() {
if (!isOpen()) return; // Don't process IO callbacks until we are open.
ScopedLock<Mutex> l(ioCallbackLock);
- // Although IO callbacks execute in the connection thread context, they are
- // not cluster safe because they are queued for execution in non-IO threads.
- ClusterUnsafeScope cus;
while (!ioCallbacks.empty()) {
boost::function0<void> cb = ioCallbacks.front();
ioCallbacks.pop();
@@ -413,9 +360,9 @@ SessionHandler& Connection::getChannel(C
return *ptr_map_ptr(i);
}
-ManagementObject* Connection::GetManagementObject(void) const
+ManagementObject::shared_ptr Connection::GetManagementObject(void) const
{
- return (ManagementObject*) mgmtObject;
+ return mgmtObject;
}
Manageable::status_t Connection::ManagementMethod(uint32_t methodId, Args&, string&)
@@ -499,7 +446,7 @@ void Connection::abort()
void Connection::setHeartbeatInterval(uint16_t heartbeat)
{
setHeartbeat(heartbeat);
- if (heartbeat > 0 && !isShadow()) {
+ if (heartbeat > 0) {
if (!heartbeatTimer) {
heartbeatTimer = new ConnectionHeartbeatTask(heartbeat, timer, *this);
timer.add(heartbeatTimer);
@@ -535,7 +482,6 @@ void Connection::OutboundFrameTracker::c
size_t Connection::OutboundFrameTracker::getBuffered() const { return next->getBuffered(); }
void Connection::OutboundFrameTracker::abort() { next->abort(); }
void Connection::OutboundFrameTracker::activateOutput() { next->activateOutput(); }
-void Connection::OutboundFrameTracker::giveReadCredit(int32_t credit) { next->giveReadCredit(credit); }
void Connection::OutboundFrameTracker::send(framing::AMQFrame& f)
{
next->send(f);
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Connection.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Connection.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Connection.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Connection.h Thu Feb 28 16:14:30 2013
@@ -30,24 +30,13 @@
#include "qpid/broker/BrokerImportExport.h"
#include "qpid/broker/ConnectionHandler.h"
#include "qpid/broker/ConnectionState.h"
-#include "qpid/broker/SessionHandler.h"
-#include "qmf/org/apache/qpid/broker/Connection.h"
-#include "qpid/Exception.h"
-#include "qpid/RefCounted.h"
-#include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/AMQP_ClientProxy.h"
-#include "qpid/framing/AMQP_ServerOperations.h"
-#include "qpid/framing/ProtocolVersion.h"
-#include "qpid/management/ManagementAgent.h"
-#include "qpid/management/Manageable.h"
-#include "qpid/ptr_map.h"
-#include "qpid/sys/AggregateOutput.h"
#include "qpid/sys/ConnectionInputHandler.h"
-#include "qpid/sys/ConnectionOutputHandler.h"
#include "qpid/sys/SecuritySettings.h"
-#include "qpid/sys/Socket.h"
-#include "qpid/sys/TimeoutHandler.h"
#include "qpid/sys/Mutex.h"
+#include "qpid/RefCounted.h"
+#include "qpid/ptr_map.h"
+
+#include "qmf/org/apache/qpid/broker/Connection.h"
#include <boost/ptr_container/ptr_map.hpp>
#include <boost/bind.hpp>
@@ -55,11 +44,17 @@
#include <algorithm>
namespace qpid {
+namespace sys {
+class Timer;
+class TimerTask;
+}
namespace broker {
class Broker;
class LinkRegistry;
+class Queue;
class SecureConnection;
+class SessionHandler;
struct ConnectionTimeoutTask;
class Connection : public sys::ConnectionInputHandler,
@@ -83,10 +78,7 @@ class Connection : public sys::Connectio
const std::string& mgmtId,
const qpid::sys::SecuritySettings&,
bool isLink = false,
- uint64_t objectId = 0,
- bool shadow=false,
- bool delayManagement = false,
- bool authenticated=true);
+ uint64_t objectId = 0);
~Connection ();
@@ -112,7 +104,7 @@ class Connection : public sys::Connectio
void closeChannel(framing::ChannelId channel);
// Manageable entry points
- management::ManagementObject* GetManagementObject (void) const;
+ management::ManagementObject::shared_ptr GetManagementObject(void) const;
management::Manageable::status_t
ManagementMethod (uint32_t methodId, management::Args& args, std::string&);
@@ -130,7 +122,6 @@ class Connection : public sys::Connectio
void notifyConnectionForced(const std::string& text);
void setUserId(const std::string& uid);
- void raiseConnectEvent();
// credentials for connected client
const std::string& getUserId() const { return ConnectionState::getUserId(); }
@@ -144,27 +135,14 @@ class Connection : public sys::Connectio
void setHeartbeatInterval(uint16_t heartbeat);
void sendHeartbeat();
void restartTimeout();
-
+
template <class F> void eachSessionHandler(F f) {
for (ChannelMap::iterator i = channels.begin(); i != channels.end(); ++i)
f(*ptr_map_ptr(i));
}
- void sendClose();
void setSecureConnection(SecureConnection* secured);
- /** True if this is a shadow connection in a cluster. */
- bool isShadow() const { return shadow; }
-
- /** True if this connection is authenticated */
- bool isAuthenticated() const { return authenticated; }
-
- // Used by cluster to update connection status
- sys::AggregateOutput& getOutputTasks() { return outputTasks; }
-
- /** Cluster delays adding management object in the constructor then calls this. */
- void addManagementObject();
-
const qpid::sys::SecuritySettings& getExternalSecuritySettings() const
{
return securitySettings;
@@ -176,9 +154,6 @@ class Connection : public sys::Connectio
bool isLink() { return link; }
void startLinkHeartbeatTimeoutTask();
- // Used by cluster during catch-up, see cluster::OutputInterceptor
- void doIoCallbacks();
-
void setClientProperties(const framing::FieldTable& cp) { clientProperties = cp; }
const framing::FieldTable& getClientProperties() const { return clientProperties; }
@@ -188,15 +163,13 @@ class Connection : public sys::Connectio
ChannelMap channels;
qpid::sys::SecuritySettings securitySettings;
- bool shadow;
- bool authenticated;
ConnectionHandler adapter;
const bool link;
bool mgmtClosing;
const std::string mgmtId;
sys::Mutex ioCallbackLock;
std::queue<boost::function0<void> > ioCallbacks;
- qmf::org::apache::qpid::broker::Connection* mgmtObject;
+ qmf::org::apache::qpid::broker::Connection::shared_ptr mgmtObject;
LinkRegistry& links;
management::ManagementAgent* agent;
sys::Timer& timer;
@@ -218,7 +191,6 @@ class Connection : public sys::Connectio
size_t getBuffered() const;
void abort();
void activateOutput();
- void giveReadCredit(int32_t credit);
void send(framing::AMQFrame&);
void wrap(sys::ConnectionOutputHandlerPtr&);
private:
@@ -228,10 +200,11 @@ class Connection : public sys::Connectio
OutboundFrameTracker outboundTracker;
void sent(const framing::AMQFrame& f);
+ void doIoCallbacks();
public:
- qmf::org::apache::qpid::broker::Connection* getMgmtObject() { return mgmtObject; }
+ qmf::org::apache::qpid::broker::Connection::shared_ptr getMgmtObject() { return mgmtObject; }
};
}}
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/ConnectionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/ConnectionHandler.cpp?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/ConnectionHandler.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/ConnectionHandler.cpp Thu Feb 28 16:14:30 2013
@@ -20,8 +20,10 @@
*
*/
-#include "qpid/SaslFactory.h"
#include "qpid/broker/ConnectionHandler.h"
+
+#include "qpid/SaslFactory.h"
+#include "qpid/broker/Broker.h"
#include "qpid/broker/Connection.h"
#include "qpid/broker/SecureConnection.h"
#include "qpid/Url.h"
@@ -30,8 +32,10 @@
#include "qpid/framing/enum.h"
#include "qpid/framing/FieldValue.h"
#include "qpid/log/Statement.h"
+#include "qpid/management/ManagementAgent.h"
#include "qpid/sys/SecurityLayer.h"
#include "qpid/broker/AclModule.h"
+#include "qpid/amqp_0_10/Codecs.h"
#include "qmf/org/apache/qpid/broker/EventClientConnectFail.h"
using namespace qpid;
@@ -148,6 +152,24 @@ void ConnectionHandler::Handler::startOk
void ConnectionHandler::Handler::startOk(const ConnectionStartOkBody& body)
{
+ const framing::FieldTable& clientProperties = body.getClientProperties();
+ qmf::org::apache::qpid::broker::Connection::shared_ptr mgmtObject = connection.getMgmtObject();
+
+ if (mgmtObject != 0) {
+ string procName = clientProperties.getAsString(CLIENT_PROCESS_NAME);
+ uint32_t pid = clientProperties.getAsInt(CLIENT_PID);
+ uint32_t ppid = clientProperties.getAsInt(CLIENT_PPID);
+
+ types::Variant::Map properties;
+ qpid::amqp_0_10::translate(clientProperties, properties);
+ mgmtObject->set_remoteProperties(properties);
+ if (!procName.empty())
+ mgmtObject->set_remoteProcessName(procName);
+ if (pid != 0)
+ mgmtObject->set_remotePid(pid);
+ if (ppid != 0)
+ mgmtObject->set_remoteParentPid(ppid);
+ }
try {
authenticator->start(body.getMechanism(), body.hasResponse() ? &body.getResponse() : 0);
} catch (std::exception& /*e*/) {
@@ -160,8 +182,9 @@ void ConnectionHandler::Handler::startOk
string uid;
authenticator->getError(error);
authenticator->getUid(uid);
- if (agent) {
- agent->raiseEvent(_qmf::EventClientConnectFail(connection.getMgmtId(), uid, error));
+ if (agent && mgmtObject) {
+ agent->raiseEvent(_qmf::EventClientConnectFail(connection.getMgmtId(), uid, error,
+ mgmtObject->get_remoteProperties()));
}
QPID_LOG_CAT(debug, model, "Failed connection. rhost:" << connection.getMgmtId()
<< " user:" << uid
@@ -169,9 +192,8 @@ void ConnectionHandler::Handler::startOk
}
throw;
}
- const framing::FieldTable& clientProperties = body.getClientProperties();
- connection.setClientProperties(clientProperties);
+ connection.setClientProperties(clientProperties);
connection.setFederationLink(clientProperties.get(QPID_FED_LINK));
if (clientProperties.isSet(QPID_FED_TAG)) {
connection.setFederationPeerTag(clientProperties.getAsString(QPID_FED_TAG));
@@ -187,19 +209,6 @@ void ConnectionHandler::Handler::startOk
}
QPID_LOG(info, "Connection is a federation link");
}
-
- if (connection.getMgmtObject() != 0) {
- string procName = clientProperties.getAsString(CLIENT_PROCESS_NAME);
- uint32_t pid = clientProperties.getAsInt(CLIENT_PID);
- uint32_t ppid = clientProperties.getAsInt(CLIENT_PPID);
-
- if (!procName.empty())
- connection.getMgmtObject()->set_remoteProcessName(procName);
- if (pid != 0)
- connection.getMgmtObject()->set_remotePid(pid);
- if (ppid != 0)
- connection.getMgmtObject()->set_remoteParentPid(ppid);
- }
}
void ConnectionHandler::Handler::secureOk(const string& response)
@@ -216,8 +225,9 @@ void ConnectionHandler::Handler::secureO
string uid;
authenticator->getError(error);
authenticator->getUid(uid);
- if (agent) {
- agent->raiseEvent(_qmf::EventClientConnectFail(connection.getMgmtId(), uid, error));
+ if (agent && connection.getMgmtObject()) {
+ agent->raiseEvent(_qmf::EventClientConnectFail(connection.getMgmtId(), uid, error,
+ connection.getMgmtObject()->get_remoteProperties()));
}
QPID_LOG_CAT(debug, model, "Failed connection. rhost:" << connection.getMgmtId()
<< " user:" << uid
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/ConnectionState.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/ConnectionState.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/ConnectionState.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/ConnectionState.h Thu Feb 28 16:14:30 2013
@@ -21,18 +21,22 @@
#ifndef _ConnectionState_
#define _ConnectionState_
-#include <vector>
-
+#include "qpid/broker/ConnectionToken.h"
#include "qpid/sys/AggregateOutput.h"
#include "qpid/sys/ConnectionOutputHandlerPtr.h"
#include "qpid/framing/ProtocolVersion.h"
#include "qpid/management/Manageable.h"
#include "qpid/Url.h"
-#include "qpid/broker/Broker.h"
+
+#include <boost/function.hpp>
+#include <vector>
+
namespace qpid {
namespace broker {
+class Broker;
+
class ConnectionState : public ConnectionToken, public management::Manageable
{
protected:
@@ -46,9 +50,8 @@ class ConnectionState : public Connectio
framemax(65535),
heartbeat(0),
heartbeatmax(120),
- userProxyAuth(false), // Can proxy msgs with non-matching auth ids when true (used by federation links & clustering)
+ userProxyAuth(false), // Can proxy msgs with non-matching auth ids when true (used by federation links)
federationLink(true),
- clusterOrderOut(0),
isDefaultRealm(false)
{}
@@ -62,14 +65,7 @@ class ConnectionState : public Connectio
void setHeartbeat(uint16_t hb) { heartbeat = hb; }
void setHeartbeatMax(uint16_t hbm) { heartbeatmax = hbm; }
- virtual void setUserId(const std::string& uid) {
- userId = uid;
- size_t at = userId.find('@');
- userName = userId.substr(0, at);
- isDefaultRealm = (
- at!= std::string::npos &&
- getBroker().getOptions().realm == userId.substr(at+1,userId.size()));
- }
+ virtual void setUserId(const std::string& uid);
const std::string& getUserId() const { return userId; }
@@ -102,15 +98,6 @@ class ConnectionState : public Connectio
framing::ProtocolVersion getVersion() const { return version; }
void setOutputHandler(qpid::sys::ConnectionOutputHandler* o) { out.set(o); }
- /**
- * If the broker is part of a cluster, this is a handler provided
- * by cluster code. It ensures consistent ordering of commands
- * that are sent based on criteria that are not predictably
- * ordered cluster-wide, e.g. a timer firing.
- */
- framing::FrameHandler* getClusterOrderOutput() { return clusterOrderOut; }
- void setClusterOrderOutput(framing::FrameHandler& fh) { clusterOrderOut = &fh; }
-
virtual void requestIOProcessing (boost::function0<void>) = 0;
protected:
@@ -124,7 +111,6 @@ class ConnectionState : public Connectio
bool federationLink;
std::string federationPeerTag;
std::vector<Url> knownHosts;
- framing::FrameHandler* clusterOrderOut;
std::string userName;
bool isDefaultRealm;
};
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Consumer.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Consumer.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Consumer.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Consumer.h Thu Feb 28 16:14:30 2013
@@ -79,6 +79,15 @@ class Consumer : public QueueCursor {
*/
virtual bool hideDeletedError() { return false; }
+ /** If false, the consumer is not counted for purposes of auto-deletion or
+ * immediate messages. This is used for "system" consumers that are created
+ * by the broker for internal purposes as opposed to consumers that are
+ * created by normal clients.
+ */
+ virtual bool isCounted() { return true; }
+
+ QueueCursor getCursor() const { return *this; }
+ void setCursor(const QueueCursor& qc) { static_cast<QueueCursor&>(*this) = qc; }
protected:
//framing::SequenceNumber position;
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/ConsumerFactory.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/ConsumerFactory.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/ConsumerFactory.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/ConsumerFactory.h Thu Feb 28 16:14:30 2013
@@ -25,11 +25,14 @@
// TODO aconway 2011-11-25: it's ugly exposing SemanticState::ConsumerImpl in public.
// Refactor to use a more abstract interface.
-#include "qpid/broker/SemanticState.h"
+#include <boost/shared_ptr.hpp>
namespace qpid {
namespace broker {
+class SemanticState;
+class SemanticStateConsumerImpl;
+
/**
* Base class for consumer factoires. Plugins can register a
* ConsumerFactory via Broker:: getConsumerFactories() Each time a
@@ -41,7 +44,7 @@ class ConsumerFactory
public:
virtual ~ConsumerFactory() {}
- virtual boost::shared_ptr<SemanticState::ConsumerImpl> create(
+ virtual boost::shared_ptr<SemanticStateConsumerImpl> create(
SemanticState* parent,
const std::string& name, boost::shared_ptr<Queue> queue,
bool ack, bool acquire, bool exclusive, const std::string& tag,
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/DeliverableMessage.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/DeliverableMessage.cpp?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/DeliverableMessage.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/DeliverableMessage.cpp Thu Feb 28 16:14:30 2013
@@ -36,8 +36,3 @@ Message& DeliverableMessage::getMessage(
{
return msg;
}
-
-uint64_t DeliverableMessage::contentSize()
-{
- return msg.getContentSize();
-}
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/DeliverableMessage.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/DeliverableMessage.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/DeliverableMessage.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/DeliverableMessage.h Thu Feb 28 16:14:30 2013
@@ -36,7 +36,6 @@ namespace qpid {
QPID_BROKER_EXTERN DeliverableMessage(const Message& msg, TxBuffer* txn);
QPID_BROKER_EXTERN virtual void deliverTo(const boost::shared_ptr<Queue>& queue);
QPID_BROKER_EXTERN Message& getMessage();
- QPID_BROKER_EXTERN uint64_t contentSize();
virtual ~DeliverableMessage(){}
};
}
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/DirectExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/DirectExchange.cpp?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/DirectExchange.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/DirectExchange.cpp Thu Feb 28 16:14:30 2013
@@ -70,7 +70,7 @@ bool DirectExchange::bind(Queue::shared_
if (args == 0 || fedOp.empty() || fedOp == fedOpBind) {
Mutex::ScopedLock l(lock);
- Binding::shared_ptr b(new Binding(routingKey, queue, this, FieldTable(), fedOrigin));
+ Binding::shared_ptr b(new Binding(routingKey, queue, this, args ? *args : FieldTable(), fedOrigin));
BoundKey& bk = bindings[routingKey];
if (exclusiveBinding) bk.queues.clear();
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/DtxManager.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/DtxManager.cpp?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/DtxManager.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/DtxManager.cpp Thu Feb 28 16:14:30 2013
@@ -27,6 +27,9 @@
#include "qpid/ptr_map.h"
#include <boost/format.hpp>
+#include <boost/bind.hpp>
+#include <boost/function.hpp>
+
#include <iostream>
using boost::intrusive_ptr;
@@ -35,6 +38,30 @@ using qpid::ptr_map_ptr;
using namespace qpid::broker;
using namespace qpid::framing;
+namespace {
+ typedef boost::function0<void> FireFunction;
+ struct DtxCleanup : public qpid::sys::TimerTask
+ {
+ FireFunction fireFunction;
+
+ DtxCleanup(uint32_t timeout, FireFunction f);
+ void fire();
+ };
+
+ DtxCleanup::DtxCleanup(uint32_t _timeout, FireFunction f)
+ : TimerTask(qpid::sys::Duration(_timeout * qpid::sys::TIME_SEC),"DtxCleanup"), fireFunction(f){}
+
+ void DtxCleanup::fire()
+ {
+ try {
+ fireFunction();
+ } catch (qpid::ConnectionException& /*e*/) {
+ //assume it was explicitly cleaned up after a call to prepare, commit or rollback
+ }
+ }
+
+}
+
//DtxManager::DtxManager(qpid::sys::Timer& t) : store(0), timer(&t) {}
DtxManager::DtxManager(qpid::sys::Timer& t) : asyncTxnStore(0), timer(&t) {}
@@ -158,19 +185,7 @@ void DtxManager::timedout(const std::str
} else {
ptr_map_ptr(i)->timedout();
//TODO: do we want to have a timed task to cleanup, or can we rely on an explicit completion?
- //timer.add(intrusive_ptr<TimerTask>(new DtxCleanup(60*30/*30 mins*/, *this, xid)));
- }
-}
-
-DtxManager::DtxCleanup::DtxCleanup(uint32_t _timeout, DtxManager& _mgr, const std::string& _xid)
- : TimerTask(qpid::sys::Duration(_timeout * qpid::sys::TIME_SEC),"DtxCleanup"), mgr(_mgr), xid(_xid) {}
-
-void DtxManager::DtxCleanup::fire()
-{
- try {
- mgr.remove(xid);
- } catch (ConnectionException& /*e*/) {
- //assume it was explicitly cleaned up after a call to prepare, commit or rollback
+ //timer->add(new DtxCleanup(60*30/*30 mins*/, boost::bind(&DtxManager::remove, this, xid)));
}
}
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/DtxManager.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/DtxManager.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/DtxManager.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/DtxManager.h Thu Feb 28 16:14:30 2013
@@ -32,20 +32,15 @@
#include "qpid/ptr_map.h"
namespace qpid {
+namespace sys {
+class Timer;
+}
+
namespace broker {
class DtxManager{
typedef boost::ptr_map<std::string, DtxWorkRecord> WorkMap;
- struct DtxCleanup : public sys::TimerTask
- {
- DtxManager& mgr;
- const std::string& xid;
-
- DtxCleanup(uint32_t timeout, DtxManager& mgr, const std::string& xid);
- void fire();
- };
-
WorkMap work;
// TransactionalStore* store;
AsyncTransactionalStore* asyncTxnStore;
@@ -71,11 +66,6 @@ public:
void setStore(AsyncTransactionalStore* const ats);
void setTimer(sys::Timer& t) { timer = &t; }
- // Used by cluster for replication.
- template<class F> void each(F f) const {
- for (WorkMap::const_iterator i = work.begin(); i != work.end(); ++i)
- f(*ptr_map_ptr(i));
- }
DtxWorkRecord* getWork(const std::string& xid);
bool exists(const std::string& xid);
static std::string convert(const framing::Xid& xid);
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/DtxWorkRecord.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/DtxWorkRecord.cpp?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/DtxWorkRecord.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/DtxWorkRecord.cpp Thu Feb 28 16:14:30 2013
@@ -20,7 +20,10 @@
*/
#include "qpid/broker/DtxWorkRecord.h"
#include "qpid/broker/DtxManager.h"
+#include "qpid/broker/DtxTimeout.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/sys/Timer.h"
+
#include <boost/format.hpp>
#include <boost/mem_fn.hpp>
using boost::mem_fn;
@@ -39,6 +42,12 @@ DtxWorkRecord::~DtxWorkRecord()
}
}
+void DtxWorkRecord::setTimeout(boost::intrusive_ptr<DtxTimeout> t)
+{ timeout = t; }
+
+boost::intrusive_ptr<DtxTimeout> DtxWorkRecord::getTimeout()
+{ return timeout; }
+
bool DtxWorkRecord::prepare()
{
Mutex::ScopedLock locker(lock);
@@ -182,17 +191,3 @@ void DtxWorkRecord::timedout()
}
abort();
}
-
-size_t DtxWorkRecord::indexOf(const DtxBuffer::shared_ptr& buf) {
- Work::iterator i = std::find(work.begin(), work.end(), buf);
- if (i == work.end()) throw NotFoundException(
- QPID_MSG("Can't find DTX buffer for xid: " << buf->getXid()));
- return i - work.begin();
-}
-
-DtxBuffer::shared_ptr DtxWorkRecord::operator[](size_t i) const {
- if (i > work.size())
- throw NotFoundException(
- QPID_MSG("Can't find DTX buffer " << i << " for xid: " << xid));
- return work[i];
-}
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/DtxWorkRecord.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/DtxWorkRecord.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/DtxWorkRecord.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/DtxWorkRecord.h Thu Feb 28 16:14:30 2013
@@ -24,7 +24,6 @@
#include "qpid/broker/AsyncStore.h"
#include "qpid/broker/BrokerImportExport.h"
#include "qpid/broker/DtxBuffer.h"
-#include "qpid/broker/DtxTimeout.h"
#include "qpid/broker/TransactionalStore.h"
#include "qpid/framing/amqp_types.h"
@@ -39,6 +38,8 @@
namespace qpid {
namespace broker {
+struct DtxTimeout;
+
/**
* Represents the work done under a particular distributed transaction
* across potentially multiple channels. Identified by a xid. Allows
@@ -74,19 +75,13 @@ public:
QPID_BROKER_EXTERN void add(DtxBuffer::shared_ptr ops);
void recover(std::auto_ptr<TPCTransactionContext> txn, DtxBuffer::shared_ptr ops);
void timedout();
- void setTimeout(boost::intrusive_ptr<DtxTimeout> t) { timeout = t; }
- boost::intrusive_ptr<DtxTimeout> getTimeout() { return timeout; }
+ void setTimeout(boost::intrusive_ptr<DtxTimeout> t);
+ boost::intrusive_ptr<DtxTimeout> getTimeout();
std::string getXid() const { return xid; }
bool isCompleted() const { return completed; }
bool isRolledback() const { return rolledback; }
bool isPrepared() const { return prepared; }
bool isExpired() const { return expired; }
-
- // Used by cluster update;
- size_t size() const { return work.size(); }
- DtxBuffer::shared_ptr operator[](size_t i) const;
- uint32_t getTimeout() const { return timeout? timeout->timeout : 0; }
- size_t indexOf(const DtxBuffer::shared_ptr&);
};
}} // qpid::broker
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Exchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Exchange.cpp?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Exchange.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Exchange.cpp Thu Feb 28 16:14:30 2013
@@ -167,19 +167,19 @@ void Exchange::routeIVE(){
Exchange::Exchange (const string& _name, Manageable* parent, Broker* b) :
name(_name), durable(false), alternateUsers(0), persistenceId(0), sequence(false),
- sequenceNo(0), ive(false), mgmtExchange(0), brokerMgmtObject(0), broker(b), destroyed(false)
+ sequenceNo(0), ive(false), broker(b), destroyed(false)
{
if (parent != 0 && broker != 0)
{
ManagementAgent* agent = broker->getManagementAgent();
if (agent != 0)
{
- mgmtExchange = new _qmf::Exchange (agent, this, parent, _name);
+ mgmtExchange = _qmf::Exchange::shared_ptr(new _qmf::Exchange (agent, this, parent, _name));
mgmtExchange->set_durable(durable);
mgmtExchange->set_autoDelete(false);
agent->addObject(mgmtExchange, 0, durable);
if (broker)
- brokerMgmtObject = (qmf::org::apache::qpid::broker::Broker*) broker->GetManagementObject();
+ brokerMgmtObject = boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(broker->GetManagementObject());
}
}
}
@@ -187,20 +187,20 @@ Exchange::Exchange (const string& _name,
Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args,
Manageable* parent, Broker* b)
: name(_name), durable(_durable), alternateUsers(0), persistenceId(0),
- args(_args), sequence(false), sequenceNo(0), ive(false), mgmtExchange(0), brokerMgmtObject(0), broker(b), destroyed(false)
+ args(_args), sequence(false), sequenceNo(0), ive(false), broker(b), destroyed(false)
{
if (parent != 0 && broker != 0)
{
ManagementAgent* agent = broker->getManagementAgent();
if (agent != 0)
{
- mgmtExchange = new _qmf::Exchange (agent, this, parent, _name);
+ mgmtExchange = _qmf::Exchange::shared_ptr(new _qmf::Exchange (agent, this, parent, _name));
mgmtExchange->set_durable(durable);
mgmtExchange->set_autoDelete(false);
mgmtExchange->set_arguments(ManagementAgent::toMap(args));
agent->addObject(mgmtExchange, 0, durable);
if (broker)
- brokerMgmtObject = (qmf::org::apache::qpid::broker::Broker*) broker->GetManagementObject();
+ brokerMgmtObject = boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(broker->GetManagementObject());
}
}
@@ -212,8 +212,6 @@ Exchange::Exchange(const string& _name,
ive = _args.get(qpidIVE);
if (ive) {
- if (broker && broker->isInCluster())
- throw framing::NotImplementedException("Cannot use Initial Value Exchanges in a cluster");
QPID_LOG(debug, "Configured exchange " << _name << " with Initial Value");
}
}
@@ -227,6 +225,7 @@ Exchange::~Exchange ()
void Exchange::setAlternate(Exchange::shared_ptr _alternate)
{
alternate = _alternate;
+ alternate->incAlternateUsers();
if (mgmtExchange != 0) {
if (alternate.get() != 0)
mgmtExchange->set_altExchange(alternate->GetManagementObject()->getObjectId());
@@ -296,9 +295,9 @@ void Exchange::recoveryComplete(Exchange
}
}
-ManagementObject* Exchange::GetManagementObject (void) const
+ManagementObject::shared_ptr Exchange::GetManagementObject (void) const
{
- return (ManagementObject*) mgmtExchange;
+ return mgmtExchange;
}
void Exchange::registerDynamicBridge(DynamicBridge* db, AsyncStore* const store)
@@ -347,16 +346,16 @@ void Exchange::propagateFedOp(const stri
Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchange* _parent,
FieldTable _args, const string& _origin, ConfigHandle _cfgHandle)
- : parent(_parent), queue(_queue), key(_key), args(_args), origin(_origin), cfgHandle(_cfgHandle), mgmtBinding(0)
+ : parent(_parent), queue(_queue), key(_key), args(_args), origin(_origin), cfgHandle(_cfgHandle)
{
}
Exchange::Binding::~Binding ()
{
if (mgmtBinding != 0) {
- ManagementObject* mo = queue->GetManagementObject();
+ _qmf::Queue::shared_ptr mo = boost::dynamic_pointer_cast<_qmf::Queue>(queue->GetManagementObject());
if (mo != 0)
- static_cast<_qmf::Queue*>(mo)->dec_bindingCount();
+ mo->dec_bindingCount();
mgmtBinding->resourceDestroy ();
}
}
@@ -369,25 +368,25 @@ void Exchange::Binding::startManagement(
if (broker != 0) {
ManagementAgent* agent = broker->getManagementAgent();
if (agent != 0) {
- ManagementObject* mo = queue->GetManagementObject();
+ _qmf::Queue::shared_ptr mo = boost::dynamic_pointer_cast<_qmf::Queue>(queue->GetManagementObject());
if (mo != 0) {
management::ObjectId queueId = mo->getObjectId();
- mgmtBinding = new _qmf::Binding
- (agent, this, (Manageable*) parent, queueId, key, ManagementAgent::toMap(args));
+ mgmtBinding = _qmf::Binding::shared_ptr(new _qmf::Binding
+ (agent, this, (Manageable*) parent, queueId, key, ManagementAgent::toMap(args)));
if (!origin.empty())
mgmtBinding->set_origin(origin);
agent->addObject(mgmtBinding);
- static_cast<_qmf::Queue*>(mo)->inc_bindingCount();
+ mo->inc_bindingCount();
}
}
}
}
}
-ManagementObject* Exchange::Binding::GetManagementObject () const
+ManagementObject::shared_ptr Exchange::Binding::GetManagementObject () const
{
- return (ManagementObject*) mgmtBinding;
+ return mgmtBinding;
}
uint64_t Exchange::Binding::getSize() { return 0; } // TODO: kpvdr: implement persistence
@@ -434,5 +433,10 @@ bool Exchange::routeWithAlternate(Delive
return msg.delivered;
}
+void Exchange::setArgs(const framing::FieldTable& newArgs) {
+ args = newArgs;
+ if (mgmtExchange) mgmtExchange->set_arguments(ManagementAgent::toMap(args));
+}
+
}}
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Exchange.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Exchange.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Exchange.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Exchange.h Thu Feb 28 16:14:30 2013
@@ -55,14 +55,14 @@ public:
const framing::FieldTable args;
std::string origin;
ConfigHandle cfgHandle;
- qmf::org::apache::qpid::broker::Binding* mgmtBinding;
+ qmf::org::apache::qpid::broker::Binding::shared_ptr mgmtBinding;
Binding(const std::string& key, boost::shared_ptr<Queue> queue, Exchange* parent = 0,
framing::FieldTable args = framing::FieldTable(), const std::string& origin = std::string(),
ConfigHandle cfgHandle = ConfigHandle());
~Binding();
void startManagement();
- management::ManagementObject* GetManagementObject() const;
+ management::ManagementObject::shared_ptr GetManagementObject() const;
// DataSource implementation - allows for persistence
uint64_t getSize();
@@ -170,8 +170,8 @@ protected:
}
};
- qmf::org::apache::qpid::broker::Exchange* mgmtExchange;
- qmf::org::apache::qpid::broker::Broker* brokerMgmtObject;
+ qmf::org::apache::qpid::broker::Exchange::shared_ptr mgmtExchange;
+ qmf::org::apache::qpid::broker::Broker::shared_ptr brokerMgmtObject;
public:
typedef boost::shared_ptr<Exchange> shared_ptr;
@@ -184,7 +184,8 @@ public:
const std::string& getName() const { return name; }
bool isDurable() { return durable; }
- qpid::framing::FieldTable& getArgs() { return args; }
+ QPID_BROKER_EXTERN const qpid::framing::FieldTable& getArgs() const { return args; }
+ QPID_BROKER_EXTERN void setArgs(const framing::FieldTable&);
QPID_BROKER_EXTERN Exchange::shared_ptr getAlternate() { return alternate; }
QPID_BROKER_EXTERN void setAlternate(Exchange::shared_ptr _alternate);
@@ -221,7 +222,7 @@ public:
static QPID_BROKER_EXTERN Exchange::shared_ptr decode(ExchangeRegistry& exchanges, framing::Buffer& buffer);
// Manageable entry points
- QPID_BROKER_EXTERN management::ManagementObject* GetManagementObject(void) const;
+ QPID_BROKER_EXTERN management::ManagementObject::shared_ptr GetManagementObject(void) const;
// Federation hooks
class DynamicBridge {
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/ExchangeRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/ExchangeRegistry.cpp?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/ExchangeRegistry.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/ExchangeRegistry.cpp Thu Feb 28 16:14:30 2013
@@ -29,20 +29,26 @@
#include "qpid/management/ManagementDirectExchange.h"
#include "qpid/management/ManagementTopicExchange.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
+#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
using namespace qpid::broker;
using namespace qpid::sys;
using std::pair;
using std::string;
using qpid::framing::FieldTable;
+using qpid::management::ManagementAgent;
+namespace _qmf = qmf::org::apache::qpid::broker;
pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type){
return declare(name, type, false, FieldTable());
}
-pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type,
- bool durable, const FieldTable& args){
+pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(
+ const string& name, const string& type, bool durable, const FieldTable& args,
+ Exchange::shared_ptr alternate, const string& connectionId, const string& userId)
+{
Exchange::shared_ptr exchange;
std::pair<Exchange::shared_ptr, bool> result;
{
@@ -73,31 +79,55 @@ pair<Exchange::shared_ptr, bool> Exchang
}
exchanges[name] = exchange;
result = std::pair<Exchange::shared_ptr, bool>(exchange, true);
+ if (alternate) exchange->setAlternate(alternate);
+ // Call exchangeCreate inside the lock to ensure correct ordering.
+ if (broker) broker->getConfigurationObservers().exchangeCreate(exchange);
} else {
result = std::pair<Exchange::shared_ptr, bool>(i->second, false);
}
+ if (broker && broker->getManagementAgent()) {
+ // Call raiseEvent inside the lock to ensure correct ordering.
+ broker->getManagementAgent()->raiseEvent(
+ _qmf::EventExchangeDeclare(
+ connectionId,
+ userId,
+ name,
+ type,
+ alternate ? alternate->getName() : string(),
+ durable,
+ false,
+ ManagementAgent::toMap(result.first->getArgs()),
+ result.second ? "created" : "existing"));
+ }
}
- if (broker && exchange) broker->getConfigurationObservers().exchangeCreate(exchange);
return result;
}
-void ExchangeRegistry::destroy(const string& name){
+void ExchangeRegistry::destroy(
+ const string& name, const string& connectionId, const string& userId)
+{
if (name.empty() ||
(name.find("amq.") == 0 &&
(name == "amq.direct" || name == "amq.fanout" || name == "amq.topic" || name == "amq.match")) ||
name == "qpid.management")
throw framing::NotAllowedException(QPID_MSG("Cannot delete default exchange: '" << name << "'"));
- Exchange::shared_ptr exchange;
{
RWlock::ScopedWlock locker(lock);
ExchangeMap::iterator i = exchanges.find(name);
if (i != exchanges.end()) {
- exchange = i->second;
+ if (broker) {
+ // Call exchangeDestroy and raiseEvent inside the lock to ensure
+ // correct ordering.
+ broker->getConfigurationObservers().exchangeDestroy(i->second);
+ if (broker->getManagementAgent())
+ broker->getManagementAgent()->raiseEvent(
+ _qmf::EventExchangeDelete(connectionId, userId, name));
+ }
i->second->destroy();
exchanges.erase(i);
+
}
}
- if (broker && exchange) broker->getConfigurationObservers().exchangeDestroy(exchange);
}
Exchange::shared_ptr ExchangeRegistry::find(const string& name){
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/ExchangeRegistry.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/ExchangeRegistry.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/ExchangeRegistry.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/ExchangeRegistry.h Thu Feb 28 16:14:30 2013
@@ -46,14 +46,23 @@ class ExchangeRegistry{
bool, const qpid::framing::FieldTable&, qpid::management::Manageable*, qpid::broker::Broker*> FactoryFunction;
ExchangeRegistry (Broker* b = 0) : parent(0), broker(b) {}
- QPID_BROKER_EXTERN std::pair<Exchange::shared_ptr, bool> declare
- (const std::string& name, const std::string& type);
- QPID_BROKER_EXTERN std::pair<Exchange::shared_ptr, bool> declare
- (const std::string& name,
- const std::string& type,
- bool durable,
- const qpid::framing::FieldTable& args = framing::FieldTable());
- QPID_BROKER_EXTERN void destroy(const std::string& name);
+ QPID_BROKER_EXTERN std::pair<Exchange::shared_ptr, bool> declare(
+ const std::string& name, const std::string& type);
+
+ QPID_BROKER_EXTERN std::pair<Exchange::shared_ptr, bool> declare(
+ const std::string& name,
+ const std::string& type,
+ bool durable,
+ const qpid::framing::FieldTable& args = framing::FieldTable(),
+ Exchange::shared_ptr alternate = Exchange::shared_ptr(),
+ const std::string& connectionId = std::string(),
+ const std::string& userId = std::string());
+
+ QPID_BROKER_EXTERN void destroy(
+ const std::string& name,
+ const std::string& connectionId = std::string(),
+ const std::string& userId = std::string());
+
QPID_BROKER_EXTERN Exchange::shared_ptr getDefault();
/**
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/FanOutExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/FanOutExchange.cpp?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/FanOutExchange.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/FanOutExchange.cpp Thu Feb 28 16:14:30 2013
@@ -54,7 +54,7 @@ bool FanOutExchange::bind(Queue::shared_
bool propagate = false;
if (args == 0 || fedOp.empty() || fedOp == fedOpBind) {
- Binding::shared_ptr binding (new Binding ("", queue, this, FieldTable(), fedOrigin));
+ Binding::shared_ptr binding (new Binding ("", queue, this, args ? *args : FieldTable(), fedOrigin));
if (bindings.add_unless(binding, MatchQueue(queue))) {
binding->startManagement();
propagate = fedBinding.addOrigin(queue->getName(), fedOrigin);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org