You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/02/28 17:14:57 UTC

svn commit: r1451244 [6/45] - in /qpid/branches/asyncstore: ./ bin/ cpp/ cpp/bindings/ cpp/bindings/qmf/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2/python/ cpp/bindings/qmf2/ruby...

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Broker.cpp?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Broker.cpp Thu Feb 28 16:14:30 2013
@@ -20,6 +20,8 @@
  */
 
 #include "qpid/broker/Broker.h"
+
+#include "qpid/broker/AclModule.h"
 #include "qpid/broker/AsyncResultHandle.h"
 #include "qpid/broker/ConfigAsyncContext.h"
 #include "qpid/broker/ConfigHandle.h"
@@ -27,9 +29,7 @@
 #include "qpid/broker/DirectExchange.h"
 #include "qpid/broker/FanOutExchange.h"
 #include "qpid/broker/HeadersExchange.h"
-//#include "qpid/broker/MessageStoreModule.h"
-//#include "qpid/broker/NullMessageStore.h"
-//#include "qpid/broker/RecoveryAsyncContext.h"
+#include "qpid/broker/NameGenerator.h"
 #include "qpid/broker/RecoveryManagerImpl.h"
 #include "qpid/broker/SaslAuthenticator.h"
 #include "qpid/broker/SecureConnectionFactory.h"
@@ -43,6 +43,7 @@
 #include "qpid/broker/MessageGroupManager.h"
 
 #include "qmf/org/apache/qpid/broker/Package.h"
+#include "qmf/org/apache/qpid/broker/ArgsBrokerConnect.h"
 #include "qmf/org/apache/qpid/broker/ArgsBrokerCreate.h"
 #include "qmf/org/apache/qpid/broker/ArgsBrokerDelete.h"
 #include "qmf/org/apache/qpid/broker/ArgsBrokerQuery.h"
@@ -50,12 +51,12 @@
 #include "qmf/org/apache/qpid/broker/ArgsBrokerGetLogLevel.h"
 #include "qmf/org/apache/qpid/broker/ArgsBrokerQueueMoveMessages.h"
 #include "qmf/org/apache/qpid/broker/ArgsBrokerSetLogLevel.h"
+#include "qmf/org/apache/qpid/broker/ArgsBrokerGetLogHiresTimestamp.h"
+#include "qmf/org/apache/qpid/broker/ArgsBrokerSetLogHiresTimestamp.h"
 #include "qmf/org/apache/qpid/broker/ArgsBrokerSetTimestampConfig.h"
 #include "qmf/org/apache/qpid/broker/ArgsBrokerGetTimestampConfig.h"
 #include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
 #include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
-#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h"
-#include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
 #include "qmf/org/apache/qpid/broker/EventBind.h"
 #include "qmf/org/apache/qpid/broker/EventUnbind.h"
 #include "qpid/amqp_0_10/Codecs.h"
@@ -75,6 +76,7 @@
 #include "qpid/sys/Dispatcher.h"
 #include "qpid/sys/Thread.h"
 #include "qpid/sys/Time.h"
+#include "qpid/sys/Timer.h"
 #include "qpid/sys/ConnectionInputHandler.h"
 #include "qpid/sys/ConnectionInputHandlerFactory.h"
 #include "qpid/sys/TimeoutHandler.h"
@@ -110,6 +112,17 @@ namespace _qmf = qmf::org::apache::qpid:
 namespace qpid {
 namespace broker {
 
+const std::string empty;
+const std::string amq_direct("amq.direct");
+const std::string amq_topic("amq.topic");
+const std::string amq_fanout("amq.fanout");
+const std::string amq_match("amq.match");
+const std::string qpid_management("qpid.management");
+const std::string knownHostsNone("none");
+
+// static
+Broker* Broker::thisBroker;
+
 Broker::Options::Options(const std::string& name) :
     qpid::Options(name),
     noDataDir(0),
@@ -127,6 +140,7 @@ Broker::Options::Options(const std::stri
     queueLimit(100*1048576/*100M default limit*/),
     tcpNoDelay(false),
     requireEncrypted(false),
+    knownHosts(knownHostsNone),
     qmf2Support(true),
     qmf1Support(true),
     queueFlowStopRatio(80),
@@ -136,7 +150,7 @@ Broker::Options::Options(const std::stri
     timestampRcvMsgs(false),    // set the 0.10 timestamp delivery property
     linkMaintenanceInterval(2),
     linkHeartbeatInterval(120),
-    maxNegotiateTime(2000)      // 2s
+    maxNegotiateTime(10000)     // 10s
 {
     int c = sys::SystemInfo::concurrency();
     workerThreads=c+1;
@@ -152,6 +166,7 @@ Broker::Options::Options(const std::stri
         ("data-dir", optValue(dataDir,"DIR"), "Directory to contain persistent data generated by the broker")
         ("no-data-dir", optValue(noDataDir), "Don't use a data directory.  No persistent configuration will be loaded or stored")
         ("port,p", optValue(port,"PORT"), "Tells the broker to listen on PORT")
+        ("interface", optValue(listenInterfaces, "<interface name>|<interface address>"), "Which network interfaces to use to listen for incoming connections")
         ("worker-threads", optValue(workerThreads, "N"), "Sets the broker thread pool size")
         ("connection-backlog", optValue(connectionBacklog, "N"), "Sets the connection backlog limit for the server socket")
         ("mgmt-enable,m", optValue(enableMgmt,"yes|no"), "Enable Management")
@@ -175,23 +190,27 @@ Broker::Options::Options(const std::stri
         ("default-event-threshold-ratio", optValue(queueThresholdEventRatio, "%age of limit"), "The ratio of any specified queue limit at which an event will be raised")
         ("default-message-group", optValue(defaultMsgGroup, "GROUP-IDENTIFER"), "Group identifier to assign to messages delivered to a message group queue that do not contain an identifier.")
         ("enable-timestamp", optValue(timestampRcvMsgs, "yes|no"), "Add current time to each received message.")
-        ("link-maintenace-interval", optValue(linkMaintenanceInterval, "SECONDS"))
-        ("link-heartbeat-interval", optValue(linkHeartbeatInterval, "SECONDS"))
-        ("max-negotiate-time", optValue(maxNegotiateTime, "MilliSeconds"), "Maximum time a connection can take to send the initial protocol negotiation")
+        ("link-maintenance-interval", optValue(linkMaintenanceInterval, "SECONDS"),
+         "Interval to check link health and re-connect  if need be")
+        ("link-heartbeat-interval", optValue(linkHeartbeatInterval, "SECONDS"),
+         "Heartbeat interval for a federation link")
+        ("max-negotiate-time", optValue(maxNegotiateTime, "MILLISECONDS"), "Maximum time a connection can take to send the initial protocol negotiation")
         ("federation-tag", optValue(fedTag, "NAME"), "Override the federation tag")
         ;
 }
 
-const std::string empty;
-const std::string amq_direct("amq.direct");
-const std::string amq_topic("amq.topic");
-const std::string amq_fanout("amq.fanout");
-const std::string amq_match("amq.match");
-const std::string qpid_management("qpid.management");
-const std::string knownHostsNone("none");
+namespace {
+// Arguments to declare a non-replicated exchange.
+framing::FieldTable noReplicateArgs() {
+    framing::FieldTable args;
+    args.setString("qpid.replicate", "none");
+    return args;
+}
+}
 
 Broker::Broker(const Broker::Options& conf) :
     poller(new Poller),
+    timer(new qpid::sys::Timer),
     config(conf),
     managementAgent(conf.enableMgmt ? new ManagementAgent(conf.qmf1Support,
                                                           conf.qmf2Support)
@@ -205,21 +224,18 @@ Broker::Broker(const Broker::Options& co
     exchanges(this),
     links(this),
     factory(new SecureConnectionFactory(*this)),
-    dtxManager(timer),
+    dtxManager(*timer.get()),
     sessionManager(
         qpid::SessionState::Configuration(
             conf.replayFlushLimit*1024, // convert kb to bytes.
             conf.replayHardLimit*1024),
         *this),
-    mgmtObject(0),
-    queueCleaner(queues, &timer),
-    recovery(true),
-    inCluster(false),
-    clusterUpdatee(false),
+    queueCleaner(queues, timer.get()),
+    recoveryInProgress(false),
     expiryPolicy(new ExpiryPolicy),
-    getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this)),
-    deferDelivery(boost::bind(&Broker::deferDeliveryImpl, this, _1, _2))
+    getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this))
 {
+    thisBroker = this;
     try {
     if (conf.enableMgmt) {
         QPID_LOG(info, "Management enabled");
@@ -231,7 +247,7 @@ Broker::Broker(const Broker::Options& co
         System* system = new System (dataDir.isEnabled() ? dataDir.getPath() : string(), this);
         systemObject = System::shared_ptr(system);
 
-        mgmtObject = new _qmf::Broker(managementAgent.get(), this, system, "amqp-broker");
+        mgmtObject = _qmf::Broker::shared_ptr(new _qmf::Broker(managementAgent.get(), this, system, "amqp-broker"));
         mgmtObject->set_systemRef(system->GetManagementObject()->getObjectId());
         mgmtObject->set_port(conf.port);
         mgmtObject->set_workerThreads(conf.workerThreads);
@@ -281,28 +297,26 @@ Broker::Broker(const Broker::Options& co
 //    if (NullMessageStore::isNullStore(store.get()))
 //        setStore();
 
-    exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
+    framing::FieldTable args;
+
+    // Default exchnge is not replicated.
+    exchanges.declare(empty, DirectExchange::typeName, false, noReplicateArgs());
 
 //    if (store.get() != 0) {
     if (asyncStore.get() != 0) {
-        // The cluster plug-in will setRecovery(false) on all but the first
-        // broker to join a cluster.
-        if (getRecovery()) {
             QPID_LOG(info, "Store recovery starting")
-            RecoveryManagerImpl recoverer(queues, exchanges, links, dtxManager);
+            RecoveryManagerImpl recoverer(queues, exchanges, links, dtxManager, protocolRegistry);
             RecoveryHandle rh = asyncStore->createRecoveryHandle();
-            boost::shared_ptr<RecoveryAsyncContext> rac(new RecoveryAsyncContext(recoverer, &recoverComplete, &asyncResultQueue));
+            boost::shared_ptr<RecoveryAsyncContext> rac(new RecoveryAsyncContext(recoverer, &recoverCompleteCb, &asyncResultQueue));
             asyncStore->submitRecover(rh, rac);
-//            store->recover(recoverer);
-        }
-        else {
-            QPID_LOG(notice, "Cluster recovery: recovered journal data discarded and journal files pushed down");
-//            store->truncateInit(true); // save old files in subdir
-            asyncStore->initialize(true, true);
-        }
+//        RecoveryManagerImpl recoverer(
+//            queues, exchanges, links, dtxManager, protocolRegistry);
+//        recoveryInProgress = true;
+//        store->recover(recoverer);
+//        recoveryInProgress = false;
     }
 // debug
-    else QPID_LOG(info, ">>>> No store!!!!")
+//    else QPID_LOG(info, ">>>> No store!!!!")
 
     //ensure standard exchanges exist (done after recovery from store)
     declareStandardExchange(amq_direct, DirectExchange::typeName);
@@ -311,7 +325,7 @@ Broker::Broker(const Broker::Options& co
     declareStandardExchange(amq_match, HeadersExchange::typeName);
 
     if(conf.enableMgmt) {
-        exchanges.declare(qpid_management, ManagementTopicExchange::typeName);
+        exchanges.declare(qpid_management, ManagementTopicExchange::typeName, false, noReplicateArgs());
         Exchange::shared_ptr mExchange = exchanges.get(qpid_management);
         Exchange::shared_ptr dExchange = exchanges.get(amq_direct);
         managementAgent->setExchange(mExchange, dExchange);
@@ -320,8 +334,10 @@ Broker::Broker(const Broker::Options& co
         std::string qmfTopic("qmf.default.topic");
         std::string qmfDirect("qmf.default.direct");
 
-        std::pair<Exchange::shared_ptr, bool> topicPair(exchanges.declare(qmfTopic, ManagementTopicExchange::typeName));
-        std::pair<Exchange::shared_ptr, bool> directPair(exchanges.declare(qmfDirect, ManagementDirectExchange::typeName));
+        std::pair<Exchange::shared_ptr, bool> topicPair(
+            exchanges.declare(qmfTopic, ManagementTopicExchange::typeName, false, noReplicateArgs()));
+        std::pair<Exchange::shared_ptr, bool> directPair(
+            exchanges.declare(qmfDirect, ManagementDirectExchange::typeName, false, noReplicateArgs()));
 
         boost::dynamic_pointer_cast<ManagementDirectExchange>(directPair.first)->setManagmentAgent(managementAgent.get(), 2);
         boost::dynamic_pointer_cast<ManagementTopicExchange>(topicPair.first)->setManagmentAgent(managementAgent.get(), 2);
@@ -349,19 +365,17 @@ Broker::Broker(const Broker::Options& co
     // Initialize plugins
     Plugin::initializeAll(*this);
 
-    if (managementAgent.get()) managementAgent->pluginsInitialized();
+    if(conf.enableMgmt) {
+        if (getAcl()) {
+            mgmtObject->set_maxConns(getAcl()->getMaxConnectTotal());
+        }
+    }
 
     if (conf.queueCleanInterval) {
         queueCleaner.start(conf.queueCleanInterval * qpid::sys::TIME_SEC);
     }
 
-    //initialize known broker urls (TODO: add support for urls for other transports (SSL, RDMA)):
-    if (conf.knownHosts.empty()) {
-        boost::shared_ptr<ProtocolFactory> factory = getProtocolFactory(TCP_TRANSPORT);
-        if (factory) {
-            knownBrokers.push_back ( qpid::Url::getIpAddressesUrl ( factory->getPort() ) );
-        }
-    } else if (conf.knownHosts != knownHostsNone) {
+    if (!conf.knownHosts.empty() && conf.knownHosts != knownHostsNone) {
         knownBrokers.push_back(Url(conf.knownHosts));
     }
 
@@ -375,11 +389,14 @@ void Broker::declareStandardExchange(con
 {
 //    bool storeEnabled = store.get() != NULL;
     bool storeEnabled = asyncStore.get() != NULL;
-    std::pair<Exchange::shared_ptr, bool> status = exchanges.declare(name, type, storeEnabled);
+    framing::FieldTable args;
+    // Standard exchanges are not replicated.
+    std::pair<Exchange::shared_ptr, bool> status =
+        exchanges.declare(name, type, storeEnabled, noReplicateArgs());
     if (status.second && storeEnabled) {
 //        store->create(*status.first, framing::FieldTable ());
         ConfigHandle ch = asyncStore->createConfigHandle();
-        boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureComplete, &asyncResultQueue));
+        boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureCompleteCb, &asyncResultQueue));
         asyncStore->submitCreate(ch, status.first.get(), bc);
     }
 }
@@ -420,11 +437,20 @@ void Broker::setStore () {
 }
 
 // static
+void Broker::recoverCompleteCb(const AsyncResultHandle* const arh) {
+    thisBroker->recoverComplete(arh);
+}
+
 void Broker::recoverComplete(const AsyncResultHandle* const arh) {
+    recoveryInProgress = false;
     std::cout << "@@@@ Broker: Recover complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl;
 }
 
 // static
+void Broker::configureCompleteCb(const AsyncResultHandle* const arh) {
+    thisBroker->configureComplete(arh);
+}
+
 void Broker::configureComplete(const AsyncResultHandle* const arh) {
     std::cout << "@@@@ Broker: Configure complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl;
 }
@@ -464,13 +490,13 @@ Broker::~Broker() {
     finalize();                 // Finalize any plugins.
     if (config.auth)
         SaslAuthenticator::fini();
-    timer.stop();
+    timer->stop();
     QPID_LOG(notice, "Shut down");
 }
 
-ManagementObject* Broker::GetManagementObject(void) const
+ManagementObject::shared_ptr Broker::GetManagementObject(void) const
 {
-    return (ManagementObject*) mgmtObject;
+    return mgmtObject;
 }
 
 Manageable* Broker::GetVhostObject(void) const
@@ -536,7 +562,7 @@ Manageable::status_t Broker::ManagementM
         _qmf::ArgsBrokerQueueMoveMessages& moveArgs=
             dynamic_cast<_qmf::ArgsBrokerQueueMoveMessages&>(args);
         QPID_LOG (debug, "Broker::queueMoveMessages()");
-        if (queueMoveMessages(moveArgs.i_srcQueue, moveArgs.i_destQueue, moveArgs.i_qty, moveArgs.i_filter))
+        if (queueMoveMessages(moveArgs.i_srcQueue, moveArgs.i_destQueue, moveArgs.i_qty, moveArgs.i_filter) >= 0)
             status = Manageable::STATUS_OK;
         else
             return Manageable::STATUS_PARAMETER_INVALID;
@@ -584,7 +610,22 @@ Manageable::status_t Broker::ManagementM
           status = setTimestampConfig(a.i_receive, getManagementExecutionContext());
           break;
         }
-   default:
+
+    case _qmf::Broker::METHOD_GETLOGHIRESTIMESTAMP:
+    {
+        dynamic_cast<_qmf::ArgsBrokerGetLogHiresTimestamp&>(args).o_logHires = getLogHiresTimestamp();
+        QPID_LOG (debug, "Broker::getLogHiresTimestamp()");
+        status = Manageable::STATUS_OK;
+        break;
+    }
+    case _qmf::Broker::METHOD_SETLOGHIRESTIMESTAMP:
+    {
+        setLogHiresTimestamp(dynamic_cast<_qmf::ArgsBrokerSetLogHiresTimestamp&>(args).i_logHires);
+        QPID_LOG (debug, "Broker::setLogHiresTimestamp()");
+        status = Manageable::STATUS_OK;
+        break;
+    }
+    default:
         QPID_LOG (debug, "Broker ManagementMethod not implemented: id=" << methodId << "]");
         status = Manageable::STATUS_NOT_IMPLEMENTED;
         break;
@@ -750,7 +791,7 @@ void Broker::createObject(const std::str
             else extensions[i->first] = i->second;
         }
         framing::FieldTable arguments;
-        amqp_0_10::translate(extensions, arguments);
+        qpid::amqp_0_10::translate(extensions, arguments);
 
         try {
             std::pair<boost::shared_ptr<Exchange>, bool> result =
@@ -772,7 +813,7 @@ void Broker::createObject(const std::str
             else extensions[i->first] = i->second;
         }
         framing::FieldTable arguments;
-        amqp_0_10::translate(extensions, arguments);
+        qpid::amqp_0_10::translate(extensions, arguments);
 
         bind(binding.queue, binding.exchange, binding.key, arguments, userId, connectionId);
 
@@ -1008,6 +1049,18 @@ std::string Broker::getLogLevel()
     return level;
 }
 
+void Broker::setLogHiresTimestamp(bool enabled)
+{
+    QPID_LOG(notice, "Changing log hires timestamp to " << enabled);
+    qpid::log::Logger::instance().setHiresTimestamp(enabled);
+}
+
+bool Broker::getLogHiresTimestamp()
+{
+    return qpid::log::Logger::instance().getHiresTimestamp();
+}
+
+
 boost::shared_ptr<ProtocolFactory> Broker::getProtocolFactory(const std::string& name) const {
     ProtocolFactoryMap::const_iterator i
         = name.empty() ? protocolFactories.begin() : protocolFactories.find(name);
@@ -1036,39 +1089,29 @@ void Broker::accept() {
 }
 
 void Broker::connect(
+    const std::string& name,
     const std::string& host, const std::string& port, const std::string& transport,
-    boost::function2<void, int, std::string> failed,
-    sys::ConnectionCodec::Factory* f)
+    boost::function2<void, int, std::string> failed)
 {
     boost::shared_ptr<ProtocolFactory> pf = getProtocolFactory(transport);
-    if (pf) pf->connect(poller, host, port, f ? f : factory.get(), failed);
+    if (pf) pf->connect(poller, name, host, port, factory.get(), failed);
     else throw NoSuchTransportException(QPID_MSG("Unsupported transport type: " << transport));
 }
 
-void Broker::connect(
-    const Url& url,
-    boost::function2<void, int, std::string> failed,
-    sys::ConnectionCodec::Factory* f)
-{
-    url.throwIfEmpty();
-    const Address& addr=url[0];
-    connect(addr.host, boost::lexical_cast<std::string>(addr.port), addr.protocol, failed, f);
-}
-
-uint32_t Broker::queueMoveMessages(
+int32_t Broker::queueMoveMessages(
      const std::string& srcQueue,
      const std::string& destQueue,
      uint32_t  qty,
      const Variant::Map& filter)
 {
-  Queue::shared_ptr src_queue = queues.find(srcQueue);
-  if (!src_queue)
-    return 0;
-  Queue::shared_ptr dest_queue = queues.find(destQueue);
-  if (!dest_queue)
-    return 0;
+    Queue::shared_ptr src_queue = queues.find(srcQueue);
+    if (!src_queue)
+        return -1;
+    Queue::shared_ptr dest_queue = queues.find(destQueue);
+    if (!dest_queue)
+        return -1;
 
-  return src_queue->move(dest_queue, qty, &filter);
+    return (int32_t) src_queue->move(dest_queue, qty, &filter);
 }
 
 
@@ -1083,12 +1126,6 @@ Broker::getKnownBrokersImpl()
 bool Broker::deferDeliveryImpl(const std::string&, const Message&)
 { return false; }
 
-void Broker::setClusterTimer(std::auto_ptr<sys::Timer> t) {
-    clusterTimer = t;
-    queueCleaner.setTimer(clusterTimer.get());
-    dtxManager.setTimer(*clusterTimer.get());
-}
-
 const std::string Broker::TCP_TRANSPORT("tcp");
 
 
@@ -1109,9 +1146,14 @@ std::pair<boost::shared_ptr<Queue>, bool
         params.insert(make_pair(acl::PROP_POLICYTYPE, settings.dropMessagesAtLimit ? "ring" : "reject"));
         params.insert(make_pair(acl::PROP_MAXQUEUECOUNT, boost::lexical_cast<string>(settings.maxDepth.getCount())));
         params.insert(make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast<string>(settings.maxDepth.getSize())));
+        params.insert(make_pair(acl::PROP_MAXFILECOUNT, boost::lexical_cast<string>(settings.maxFileCount)));
+        params.insert(make_pair(acl::PROP_MAXFILESIZE, boost::lexical_cast<string>(settings.maxFileSize)));
 
         if (!acl->authorise(userId,acl::ACT_CREATE,acl::OBJ_QUEUE,name,&params) )
             throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue create request from " << userId));
+
+        if (!acl->approveCreateQueue(userId,name) )
+            throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue create request from " << userId));
     }
 
     Exchange::shared_ptr alternate;
@@ -1120,21 +1162,12 @@ std::pair<boost::shared_ptr<Queue>, bool
         if (!alternate) throw framing::NotFoundException(QPID_MSG("Alternate exchange does not exist: " << alternateExchange));
     }
 
-    std::pair<Queue::shared_ptr, bool> result = queues.declare(name, settings, alternate);
+    std::pair<Queue::shared_ptr, bool> result =
+        queues.declare(name, settings, alternate, false/*recovering*/,
+                       owner, connectionId, userId);
     if (result.second) {
         //add default binding:
         result.first->bind(exchanges.getDefault(), name, qpid::framing::FieldTable());
-
-        if (managementAgent.get()) {
-            //TODO: debatable whether we should raise an event here for
-            //create when this is a 'declare' event; ideally add a create
-            //event instead?
-            managementAgent->raiseEvent(
-                _qmf::EventQueueDeclare(connectionId, userId, name,
-                                        settings.durable, owner, settings.autodelete, alternateExchange,
-                                        settings.asMap(),
-                                        "created"));
-        }
         QPID_LOG_CAT(debug, model, "Create queue. name:" << name
             << " user:" << userId
             << " rhost:" << connectionId
@@ -1149,6 +1182,10 @@ std::pair<boost::shared_ptr<Queue>, bool
 void Broker::deleteQueue(const std::string& name, const std::string& userId,
                          const std::string& connectionId, QueueFunctor check)
 {
+    QPID_LOG_CAT(debug, model, "Deleting queue. name:" << name
+                 << " user:" << userId
+                 << " rhost:" << connectionId
+    );
     if (acl && !acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_QUEUE,name,NULL)) {
         throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue delete request from " << userId));
     }
@@ -1156,19 +1193,13 @@ void Broker::deleteQueue(const std::stri
     Queue::shared_ptr queue = queues.find(name);
     if (queue) {
         if (check) check(queue);
-        queues.destroy(name);
+        if (acl)
+            acl->recordDestroyQueue(name);
+        queues.destroy(name, connectionId, userId);
         queue->destroyed();
     } else {
         throw framing::NotFoundException(QPID_MSG("Delete failed. No such queue: " << name));
     }
-
-    if (managementAgent.get())
-        managementAgent->raiseEvent(_qmf::EventQueueDelete(connectionId, userId, name));
-    QPID_LOG_CAT(debug, model, "Delete queue. name:" << name
-        << " user:" << userId
-        << " rhost:" << connectionId
-    );
-
 }
 
 std::pair<Exchange::shared_ptr, bool> Broker::createExchange(
@@ -1196,33 +1227,16 @@ std::pair<Exchange::shared_ptr, bool> Br
     }
 
     std::pair<Exchange::shared_ptr, bool> result;
-    result = exchanges.declare(name, type, durable, arguments);
+    result = exchanges.declare(
+        name, type, durable, arguments, alternate, connectionId, userId);
     if (result.second) {
-        if (alternate) {
-            result.first->setAlternate(alternate);
-            alternate->incAlternateUsers();
-        }
         if (durable) {
 //            store->create(*result.first, arguments);
             ConfigHandle ch = asyncStore->createConfigHandle();
             result.first->setHandle(ch);
-            boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureComplete, &asyncResultQueue));
+            boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureCompleteCb, &asyncResultQueue));
             asyncStore->submitCreate(ch, result.first.get(), bc);
         }
-        if (managementAgent.get()) {
-            //TODO: debatable whether we should raise an event here for
-            //create when this is a 'declare' event; ideally add a create
-            //event instead?
-            managementAgent->raiseEvent(_qmf::EventExchangeDeclare(connectionId,
-                                                         userId,
-                                                         name,
-                                                         type,
-                                                         alternateExchange,
-                                                         durable,
-                                                         false,
-                                                         ManagementAgent::toMap(arguments),
-                                                         "created"));
-        }
         QPID_LOG_CAT(debug, model, "Create exchange. name:" << name
             << " user:" << userId
             << " rhost:" << connectionId
@@ -1236,6 +1250,9 @@ std::pair<Exchange::shared_ptr, bool> Br
 void Broker::deleteExchange(const std::string& name, const std::string& userId,
                            const std::string& connectionId)
 {
+    QPID_LOG_CAT(debug, model, "Deleting exchange. name:" << name
+        << " user:" << userId
+        << " rhost:" << connectionId);
     if (acl) {
         if (!acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_EXCHANGE,name,NULL) )
             throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange delete request from " << userId));
@@ -1246,21 +1263,15 @@ void Broker::deleteExchange(const std::s
     }
     Exchange::shared_ptr exchange(exchanges.get(name));
     if (!exchange) throw framing::NotFoundException(QPID_MSG("Delete failed. No such exchange: " << name));
-    if (exchange->inUseAsAlternate()) throw framing::NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange."));
+    if (exchange->inUseAsAlternate()) throw framing::NotAllowedException(QPID_MSG("Cannot delete " << name <<", in use as alternate-exchange."));
 //    if (exchange->isDurable()) store->destroy(*exchange);
     if (exchange->isDurable()) {
-        boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureComplete, &asyncResultQueue));
+        boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureCompleteCb, &asyncResultQueue));
         asyncStore->submitDestroy(exchange->getHandle(), bc);
         exchange->resetHandle();
     }
     if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers();
-    exchanges.destroy(name);
-
-    if (managementAgent.get())
-        managementAgent->raiseEvent(_qmf::EventExchangeDelete(connectionId, userId, name));
-    QPID_LOG_CAT(debug, model, "Delete exchange. name:" << name
-        << " user:" << userId
-        << " rhost:" << connectionId);
+    exchanges.destroy(name, connectionId,  userId);
 }
 
 void Broker::bind(const std::string& queueName,
@@ -1298,6 +1309,7 @@ void Broker::bind(const std::string& que
             QPID_LOG_CAT(debug, model, "Create binding. exchange:" << exchangeName
                 << " queue:" << queueName
                 << " key:" << key
+                << " arguments:" << arguments
                 << " user:" << userId
                 << " rhost:" << connectionId);
         }

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Broker.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Broker.h Thu Feb 28 16:14:30 2013
@@ -25,40 +25,30 @@
 #include "qpid/broker/AsyncResultQueueImpl.h"
 #include "qpid/broker/AsyncStore.h"
 #include "qpid/broker/BrokerImportExport.h"
-#include "qpid/broker/ConnectionFactory.h"
-#include "qpid/broker/ConnectionToken.h"
-#include "qpid/broker/DirectExchange.h"
+
+#include "qpid/DataDir.h"
+#include "qpid/Options.h"
+#include "qpid/Plugin.h"
 #include "qpid/broker/DtxManager.h"
 #include "qpid/broker/ExchangeRegistry.h"
 //#include "qpid/broker/MessageStore.h"
+#include "qpid/broker/Protocol.h"
 #include "qpid/broker/QueueRegistry.h"
 #include "qpid/broker/LinkRegistry.h"
 #include "qpid/broker/SessionManager.h"
 #include "qpid/broker/QueueCleaner.h"
 #include "qpid/broker/Vhost.h"
 #include "qpid/broker/System.h"
-#include "qpid/broker/ExpiryPolicy.h"
 #include "qpid/broker/ConsumerFactory.h"
 #include "qpid/broker/ConnectionObservers.h"
 #include "qpid/broker/ConfigurationObservers.h"
 #include "qpid/management/Manageable.h"
-#include "qpid/management/ManagementAgent.h"
-#include "qmf/org/apache/qpid/broker/Broker.h"
-#include "qmf/org/apache/qpid/broker/ArgsBrokerConnect.h"
-#include "qpid/Options.h"
-#include "qpid/Plugin.h"
-#include "qpid/DataDir.h"
-#include "qpid/framing/FrameHandler.h"
-#include "qpid/framing/OutputHandler.h"
-#include "qpid/framing/ProtocolInitiation.h"
-#include "qpid/sys/Runnable.h"
-#include "qpid/sys/Timer.h"
-#include "qpid/types/Variant.h"
-#include "qpid/RefCounted.h"
-#include "qpid/broker/AclModule.h"
+#include "qpid/sys/ConnectionCodec.h"
 #include "qpid/sys/Mutex.h"
+#include "qpid/sys/Runnable.h"
 
 #include <boost/intrusive_ptr.hpp>
+
 #include <string>
 #include <vector>
 
@@ -67,12 +57,14 @@ namespace qpid {
 namespace sys {
 class ProtocolFactory;
 class Poller;
+class Timer;
 }
 
 struct Url;
 
 namespace broker {
 
+class AclModule;
 class ConnectionState;
 class ExpiryPolicy;
 class Message;
@@ -103,6 +95,7 @@ class Broker : public sys::Runnable, pub
         bool noDataDir;
         std::string dataDir;
         uint16_t port;
+        std::vector<std::string> listenInterfaces;
         int workerThreads;
         int connectionBacklog;
         bool enableMgmt;
@@ -139,10 +132,15 @@ class Broker : public sys::Runnable, pub
 
     void declareStandardExchange(const std::string& name, const std::string& type);
     void setStore ();
-    static void recoverComplete(const AsyncResultHandle* const);
-    static void configureComplete(const AsyncResultHandle* const);
+    static void recoverCompleteCb(const AsyncResultHandle* const);
+    static void configureCompleteCb(const AsyncResultHandle* const);
+    static Broker* thisBroker;
+    void recoverComplete(const AsyncResultHandle* const arh);
+    void configureComplete(const AsyncResultHandle* const arh);
     void setLogLevel(const std::string& level);
     std::string getLogLevel();
+    void setLogHiresTimestamp(bool enabled);
+    bool getLogHiresTimestamp();
     void createObject(const std::string& type, const std::string& name,
                       const qpid::types::Variant::Map& properties, bool strict, const ConnectionState* context);
     void deleteObject(const std::string& type, const std::string& name,
@@ -158,8 +156,7 @@ class Broker : public sys::Runnable, pub
     Manageable::status_t setTimestampConfig(const bool receive,
                                             const ConnectionState* context);
     boost::shared_ptr<sys::Poller> poller;
-    sys::Timer timer;
-    std::auto_ptr<sys::Timer> clusterTimer;
+    std::auto_ptr<sys::Timer> timer;
     Options config;
     std::auto_ptr<management::ManagementAgent> managementAgent;
     ProtocolFactoryMap protocolFactories;
@@ -178,7 +175,7 @@ class Broker : public sys::Runnable, pub
     boost::shared_ptr<sys::ConnectionCodec::Factory> factory;
     DtxManager dtxManager;
     SessionManager sessionManager;
-    qmf::org::apache::qpid::broker::Broker* mgmtObject;
+    qmf::org::apache::qpid::broker::Broker::shared_ptr mgmtObject;
     Vhost::shared_ptr            vhostObject;
     System::shared_ptr           systemObject;
     QueueCleaner queueCleaner;
@@ -187,10 +184,10 @@ class Broker : public sys::Runnable, pub
     bool deferDeliveryImpl(const std::string& queue,
                            const Message& msg);
     std::string federationTag;
-    bool recovery;
-    bool inCluster, clusterUpdatee;
+    bool recoveryInProgress;
     boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
     ConsumerFactories consumerFactories;
+    ProtocolRegistry protocolRegistry;
 
     mutable sys::Mutex linkClientPropertiesLock;
     framing::FieldTable linkClientProperties;
@@ -233,6 +230,7 @@ class Broker : public sys::Runnable, pub
     DataDir& getDataDir() { return dataDir; }
     Options& getOptions() { return config; }
     AsyncResultQueueImpl& getAsyncResultQueue() { return asyncResultQueue; }
+    ProtocolRegistry& getProtocolRegistry() { return protocolRegistry; }
 
     void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) { expiryPolicy = e; }
     boost::intrusive_ptr<ExpiryPolicy> getExpiryPolicy() { return expiryPolicy; }
@@ -240,7 +238,7 @@ class Broker : public sys::Runnable, pub
     SessionManager& getSessionManager() { return sessionManager; }
     const std::string& getFederationTag() const { return federationTag; }
 
-    QPID_BROKER_EXTERN management::ManagementObject* GetManagementObject() const;
+    QPID_BROKER_EXTERN management::ManagementObject::shared_ptr GetManagementObject() const;
     QPID_BROKER_EXTERN management::Manageable* GetVhostObject() const;
     QPID_BROKER_EXTERN management::Manageable::status_t ManagementMethod(
         uint32_t methodId, management::Args& args, std::string& text);
@@ -253,19 +251,17 @@ class Broker : public sys::Runnable, pub
     QPID_BROKER_EXTERN void accept();
 
     /** Create a connection to another broker. */
-    void connect(const std::string& host, const std::string& port,
+    void connect(const std::string& name,
+                 const std::string& host, const std::string& port,
                  const std::string& transport,
-                 boost::function2<void, int, std::string> failed,
-                 sys::ConnectionCodec::Factory* =0);
-    /** Create a connection to another broker. */
-    void connect(const Url& url,
-                 boost::function2<void, int, std::string> failed,
-                 sys::ConnectionCodec::Factory* =0);
+                 boost::function2<void, int, std::string> failed);
 
     /** Move messages from one queue to another.
         A zero quantity means to move all messages
+        Return -1 if one of the queues does not exist, otherwise
+               the number of messages moved.
     */
-    QPID_BROKER_EXTERN uint32_t queueMoveMessages(
+    QPID_BROKER_EXTERN int32_t queueMoveMessages(
         const std::string& srcQueue,
         const std::string& destQueue,
         uint32_t  qty,
@@ -277,46 +273,17 @@ class Broker : public sys::Runnable, pub
     /** Expose poller so plugins can register their descriptors. */
     QPID_BROKER_EXTERN boost::shared_ptr<sys::Poller> getPoller();
 
-    boost::shared_ptr<sys::ConnectionCodec::Factory> getConnectionFactory() { return factory; }
-    void setConnectionFactory(boost::shared_ptr<sys::ConnectionCodec::Factory> f) { factory = f; }
-
     /** Timer for local tasks affecting only this broker */
-    sys::Timer& getTimer() { return timer; }
-
-    /** Timer for tasks that must be synchronized if we are in a cluster */
-    sys::Timer& getClusterTimer() { return clusterTimer.get() ? *clusterTimer : timer; }
-    QPID_BROKER_EXTERN void setClusterTimer(std::auto_ptr<sys::Timer>);
+    sys::Timer& getTimer() { return *timer; }
 
     boost::function<std::vector<Url> ()> getKnownBrokers;
 
     static QPID_BROKER_EXTERN const std::string TCP_TRANSPORT;
 
-    void setRecovery(bool set) { recovery = set; }
-    bool getRecovery() const { return recovery; }
-
-    /** True of this broker is part of a cluster.
-     * Only valid after early initialization of plugins is complete.
-     */
-    bool isInCluster() const { return inCluster; }
-    void setInCluster(bool set) { inCluster = set; }
-
-    /** True if this broker is joining a cluster and in the process of
-     * receiving a state update.
-     */
-    bool isClusterUpdatee() const { return clusterUpdatee; }
-    void setClusterUpdatee(bool set) { clusterUpdatee = set; }
+    bool inRecovery() const { return recoveryInProgress; }
 
     management::ManagementAgent* getManagementAgent() { return managementAgent.get(); }
 
-    /**
-     * Never true in a stand-alone broker. In a cluster, return true
-     * to defer delivery of messages deliveredg in a cluster-unsafe
-     * context.
-     *@return true if delivery of a message should be deferred.
-     */
-    boost::function<bool (const std::string& queue,
-                          const Message& msg)> deferDelivery;
-
     bool isAuthenticating ( ) { return config.auth; }
     bool isTimestamping() { return config.timestampRcvMsgs; }
 
@@ -371,8 +338,10 @@ class Broker : public sys::Runnable, pub
     QPID_BROKER_EXTERN framing::FieldTable getLinkClientProperties() const;
     QPID_BROKER_EXTERN void setLinkClientProperties(const framing::FieldTable&);
 
+    QPID_BROKER_EXTERN uint16_t getLinkHearbeatInterval() { return config.linkHeartbeatInterval; }
     /** Information identifying this system */
     boost::shared_ptr<const System> getSystem() const { return systemObject; }
+  friend class StatusCheckThread;
 };
 
 }}

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/ConfigurationObserver.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/ConfigurationObserver.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/ConfigurationObserver.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/ConfigurationObserver.h Thu Feb 28 16:14:30 2013
@@ -38,6 +38,10 @@ class Exchange;
 
 /**
  * Observer for changes to configuration (aka wiring)
+ *
+ * NOTE: create and destroy functions are called with
+ * the registry lock held. This is necessary to ensure
+ * they are called in the correct sequence.
  */
 class ConfigurationObserver
 {

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Connection.cpp?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Connection.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Connection.cpp Thu Feb 28 16:14:30 2013
@@ -25,9 +25,9 @@
 #include "qpid/broker/Bridge.h"
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/Queue.h"
-#include "qpid/broker/AclModule.h"
+#include "qpid/management/ManagementAgent.h"
 #include "qpid/sys/SecuritySettings.h"
-#include "qpid/sys/ClusterSafe.h"
+#include "qpid/sys/Timer.h"
 
 #include "qpid/log/Statement.h"
 #include "qpid/ptr_map.h"
@@ -86,20 +86,14 @@ Connection::Connection(ConnectionOutputH
                        std::string& mgmtId_,
                        const qpid::sys::SecuritySettings& external,
                        bool link_,
-                       uint64_t objectId_,
-                       bool shadow_,
-                       bool delayManagement,
-                       bool authenticated_
+                       uint64_t objectId_
 ) :
     ConnectionState(out_, broker_),
     securitySettings(external),
-    shadow(shadow_),
-    authenticated(authenticated_),
     adapter(*this, link_),
     link(link_),
     mgmtClosing(false),
     mgmtId(mgmtId_),
-    mgmtObject(0),
     links(broker_.getLinks()),
     agent(0),
     timer(broker_.getTimer()),
@@ -109,11 +103,6 @@ Connection::Connection(ConnectionOutputH
 {
     outboundTracker.wrap(out);
     broker.getConnectionObservers().connection(*this);
-    // In a cluster, allow adding the management object to be delayed.
-    if (!delayManagement) addManagementObject();
-}
-
-void Connection::addManagementObject() {
     assert(agent == 0);
     assert(mgmtObject == 0);
     Manageable* parent = broker.GetVhostObject();
@@ -121,8 +110,7 @@ void Connection::addManagementObject() {
         agent = broker.getManagementAgent();
         if (agent != 0) {
             // TODO set last bool true if system connection
-            mgmtObject = new _qmf::Connection(agent, this, parent, mgmtId, !link, false);
-            mgmtObject->set_shadow(shadow);
+            mgmtObject = _qmf::Connection::shared_ptr(new _qmf::Connection(agent, this, parent, mgmtId, !link, false, "AMQP 0-10"));
             agent->addObject(mgmtObject, objectId);
         }
         ConnectionState::setUrl(mgmtId);
@@ -139,13 +127,11 @@ void Connection::requestIOProcessing(boo
 Connection::~Connection()
 {
     if (mgmtObject != 0) {
-        mgmtObject->resourceDestroy();
-        // In a cluster, Connections destroyed during shutdown are in
-        // a cluster-unsafe context. Don't raise an event in that case.
-        if (!link && isClusterSafe())
-            agent->raiseEvent(_qmf::EventClientDisconnect(mgmtId, ConnectionState::getUserId()));
+        if (!link)
+            agent->raiseEvent(_qmf::EventClientDisconnect(mgmtId, ConnectionState::getUserId(), mgmtObject->get_remoteProperties()));
         QPID_LOG_CAT(debug, model, "Delete connection. user:" << ConnectionState::getUserId()
             << " rhost:" << mgmtId );
+        mgmtObject->resourceDestroy();
     }
     broker.getConnectionObservers().closed(*this);
 
@@ -188,8 +174,7 @@ bool isMessage(const AMQMethodBody* meth
 
 void Connection::recordFromServer(const framing::AMQFrame& frame)
 {
-    // Don't record management stats in cluster-unsafe contexts
-    if (mgmtObject != 0 && isClusterSafe())
+    if (mgmtObject != 0)
     {
         qmf::org::apache::qpid::broker::Connection::PerThreadStats *cStats = mgmtObject->getStatistics();
         cStats->framesToClient += 1;
@@ -203,8 +188,7 @@ void Connection::recordFromServer(const 
 
 void Connection::recordFromClient(const framing::AMQFrame& frame)
 {
-    // Don't record management stats in cluster-unsafe contexts
-    if (mgmtObject != 0 && isClusterSafe())
+    if (mgmtObject != 0)
     {
         qmf::org::apache::qpid::broker::Connection::PerThreadStats *cStats = mgmtObject->getStatistics();
         cStats->framesFromClient += 1;
@@ -279,28 +263,7 @@ void Connection::notifyConnectionForced(
 
 void Connection::setUserId(const string& userId)
 {
-    // Account for changing userId
-    AclModule* acl = broker.getAcl();
-    if (acl)
-    {
-        acl->setUserId(*this, userId);
-    }
-
     ConnectionState::setUserId(userId);
-    // In a cluster, the cluster code will raise the connect event
-    // when the connection is replicated to the cluster.
-    if (!broker.isInCluster()) raiseConnectEvent();
-}
-
-void Connection::raiseConnectEvent() {
-    if (mgmtObject != 0) {
-        mgmtObject->set_authIdentity(userId);
-        agent->raiseEvent(_qmf::EventClientConnect(mgmtId, userId));
-    }
-
-    QPID_LOG_CAT(debug, model, "Create connection. user:" << userId
-        << " rhost:" << mgmtId );
-
 }
 
 void Connection::setUserProxyAuth(bool b)
@@ -327,19 +290,6 @@ void Connection::close(connection::Close
     getOutput().close();
 }
 
-// Send a close to the client but keep the channels. Used by cluster.
-void Connection::sendClose() {
-    if (heartbeatTimer)
-        heartbeatTimer->cancel();
-    if (timeoutTimer)
-        timeoutTimer->cancel();
-    if (linkHeartbeatTimer) {
-        linkHeartbeatTimer->cancel();
-    }
-    adapter.close(connection::CLOSE_CODE_NORMAL, "OK");
-    getOutput().close();
-}
-
 void Connection::idleOut(){}
 
 void Connection::idleIn(){}
@@ -364,9 +314,6 @@ void Connection::closed(){ // Physically
 void Connection::doIoCallbacks() {
     if (!isOpen()) return; // Don't process IO callbacks until we are open.
     ScopedLock<Mutex> l(ioCallbackLock);
-    // Although IO callbacks execute in the connection thread context, they are
-    // not cluster safe because they are queued for execution in non-IO threads.
-    ClusterUnsafeScope cus;
     while (!ioCallbacks.empty()) {
         boost::function0<void> cb = ioCallbacks.front();
         ioCallbacks.pop();
@@ -413,9 +360,9 @@ SessionHandler& Connection::getChannel(C
     return *ptr_map_ptr(i);
 }
 
-ManagementObject* Connection::GetManagementObject(void) const
+ManagementObject::shared_ptr Connection::GetManagementObject(void) const
 {
-    return (ManagementObject*) mgmtObject;
+    return mgmtObject;
 }
 
 Manageable::status_t Connection::ManagementMethod(uint32_t methodId, Args&, string&)
@@ -499,7 +446,7 @@ void Connection::abort()
 void Connection::setHeartbeatInterval(uint16_t heartbeat)
 {
     setHeartbeat(heartbeat);
-    if (heartbeat > 0 && !isShadow()) {
+    if (heartbeat > 0) {
         if (!heartbeatTimer) {
             heartbeatTimer = new ConnectionHeartbeatTask(heartbeat, timer, *this);
             timer.add(heartbeatTimer);
@@ -535,7 +482,6 @@ void Connection::OutboundFrameTracker::c
 size_t Connection::OutboundFrameTracker::getBuffered() const { return next->getBuffered(); }
 void Connection::OutboundFrameTracker::abort() { next->abort(); }
 void Connection::OutboundFrameTracker::activateOutput() { next->activateOutput(); }
-void Connection::OutboundFrameTracker::giveReadCredit(int32_t credit) { next->giveReadCredit(credit); }
 void Connection::OutboundFrameTracker::send(framing::AMQFrame& f)
 {
     next->send(f);

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Connection.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Connection.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Connection.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Connection.h Thu Feb 28 16:14:30 2013
@@ -30,24 +30,13 @@
 #include "qpid/broker/BrokerImportExport.h"
 #include "qpid/broker/ConnectionHandler.h"
 #include "qpid/broker/ConnectionState.h"
-#include "qpid/broker/SessionHandler.h"
-#include "qmf/org/apache/qpid/broker/Connection.h"
-#include "qpid/Exception.h"
-#include "qpid/RefCounted.h"
-#include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/AMQP_ClientProxy.h"
-#include "qpid/framing/AMQP_ServerOperations.h"
-#include "qpid/framing/ProtocolVersion.h"
-#include "qpid/management/ManagementAgent.h"
-#include "qpid/management/Manageable.h"
-#include "qpid/ptr_map.h"
-#include "qpid/sys/AggregateOutput.h"
 #include "qpid/sys/ConnectionInputHandler.h"
-#include "qpid/sys/ConnectionOutputHandler.h"
 #include "qpid/sys/SecuritySettings.h"
-#include "qpid/sys/Socket.h"
-#include "qpid/sys/TimeoutHandler.h"
 #include "qpid/sys/Mutex.h"
+#include "qpid/RefCounted.h"
+#include "qpid/ptr_map.h"
+
+#include "qmf/org/apache/qpid/broker/Connection.h"
 
 #include <boost/ptr_container/ptr_map.hpp>
 #include <boost/bind.hpp>
@@ -55,11 +44,17 @@
 #include <algorithm>
 
 namespace qpid {
+namespace sys {
+class Timer;
+class TimerTask;
+}
 namespace broker {
 
 class Broker;
 class LinkRegistry;
+class Queue;
 class SecureConnection;
+class SessionHandler;
 struct ConnectionTimeoutTask;
 
 class Connection : public sys::ConnectionInputHandler,
@@ -83,10 +78,7 @@ class Connection : public sys::Connectio
                const std::string& mgmtId,
                const qpid::sys::SecuritySettings&,
                bool isLink = false,
-               uint64_t objectId = 0,
-               bool shadow=false,
-               bool delayManagement = false,
-               bool authenticated=true);
+               uint64_t objectId = 0);
 
     ~Connection ();
 
@@ -112,7 +104,7 @@ class Connection : public sys::Connectio
     void closeChannel(framing::ChannelId channel);
 
     // Manageable entry points
-    management::ManagementObject* GetManagementObject (void) const;
+    management::ManagementObject::shared_ptr GetManagementObject(void) const;
     management::Manageable::status_t
         ManagementMethod (uint32_t methodId, management::Args& args, std::string&);
 
@@ -130,7 +122,6 @@ class Connection : public sys::Connectio
 
     void notifyConnectionForced(const std::string& text);
     void setUserId(const std::string& uid);
-    void raiseConnectEvent();
 
     // credentials for connected client
     const std::string& getUserId() const { return ConnectionState::getUserId(); }
@@ -144,27 +135,14 @@ class Connection : public sys::Connectio
     void setHeartbeatInterval(uint16_t heartbeat);
     void sendHeartbeat();
     void restartTimeout();
-    
+
     template <class F> void eachSessionHandler(F f) {
         for (ChannelMap::iterator i = channels.begin(); i != channels.end(); ++i)
             f(*ptr_map_ptr(i));
     }
 
-    void sendClose();
     void setSecureConnection(SecureConnection* secured);
 
-    /** True if this is a shadow connection in a cluster. */
-    bool isShadow() const { return shadow; }
-
-    /** True if this connection is authenticated */
-    bool isAuthenticated() const { return authenticated; }
-
-    // Used by cluster to update connection status
-    sys::AggregateOutput& getOutputTasks() { return outputTasks; }
-
-    /** Cluster delays adding management object in the constructor then calls this. */
-    void addManagementObject();
-
     const qpid::sys::SecuritySettings& getExternalSecuritySettings() const
     {
         return securitySettings;
@@ -176,9 +154,6 @@ class Connection : public sys::Connectio
     bool isLink() { return link; }
     void startLinkHeartbeatTimeoutTask();
 
-    // Used by cluster during catch-up, see cluster::OutputInterceptor
-    void doIoCallbacks();
-
     void setClientProperties(const framing::FieldTable& cp) { clientProperties = cp; }
     const framing::FieldTable& getClientProperties() const { return clientProperties; }
 
@@ -188,15 +163,13 @@ class Connection : public sys::Connectio
 
     ChannelMap channels;
     qpid::sys::SecuritySettings securitySettings;
-    bool shadow;
-    bool authenticated;
     ConnectionHandler adapter;
     const bool link;
     bool mgmtClosing;
     const std::string mgmtId;
     sys::Mutex ioCallbackLock;
     std::queue<boost::function0<void> > ioCallbacks;
-    qmf::org::apache::qpid::broker::Connection* mgmtObject;
+    qmf::org::apache::qpid::broker::Connection::shared_ptr mgmtObject;
     LinkRegistry& links;
     management::ManagementAgent* agent;
     sys::Timer& timer;
@@ -218,7 +191,6 @@ class Connection : public sys::Connectio
         size_t getBuffered() const;
         void abort();
         void activateOutput();
-        void giveReadCredit(int32_t credit);
         void send(framing::AMQFrame&);
         void wrap(sys::ConnectionOutputHandlerPtr&);
       private:
@@ -228,10 +200,11 @@ class Connection : public sys::Connectio
     OutboundFrameTracker outboundTracker;
 
     void sent(const framing::AMQFrame& f);
+    void doIoCallbacks();
 
   public:
 
-    qmf::org::apache::qpid::broker::Connection* getMgmtObject() { return mgmtObject; }
+    qmf::org::apache::qpid::broker::Connection::shared_ptr getMgmtObject() { return mgmtObject; }
 };
 
 }}

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/ConnectionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/ConnectionHandler.cpp?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/ConnectionHandler.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/ConnectionHandler.cpp Thu Feb 28 16:14:30 2013
@@ -20,8 +20,10 @@
  *
  */
 
-#include "qpid/SaslFactory.h"
 #include "qpid/broker/ConnectionHandler.h"
+
+#include "qpid/SaslFactory.h"
+#include "qpid/broker/Broker.h"
 #include "qpid/broker/Connection.h"
 #include "qpid/broker/SecureConnection.h"
 #include "qpid/Url.h"
@@ -30,8 +32,10 @@
 #include "qpid/framing/enum.h"
 #include "qpid/framing/FieldValue.h"
 #include "qpid/log/Statement.h"
+#include "qpid/management/ManagementAgent.h"
 #include "qpid/sys/SecurityLayer.h"
 #include "qpid/broker/AclModule.h"
+#include "qpid/amqp_0_10/Codecs.h"
 #include "qmf/org/apache/qpid/broker/EventClientConnectFail.h"
 
 using namespace qpid;
@@ -148,6 +152,24 @@ void ConnectionHandler::Handler::startOk
 
 void ConnectionHandler::Handler::startOk(const ConnectionStartOkBody& body)
 {
+    const framing::FieldTable& clientProperties = body.getClientProperties();
+    qmf::org::apache::qpid::broker::Connection::shared_ptr mgmtObject = connection.getMgmtObject();
+
+    if (mgmtObject != 0) {
+        string procName = clientProperties.getAsString(CLIENT_PROCESS_NAME);
+        uint32_t pid = clientProperties.getAsInt(CLIENT_PID);
+        uint32_t ppid = clientProperties.getAsInt(CLIENT_PPID);
+
+        types::Variant::Map properties;
+        qpid::amqp_0_10::translate(clientProperties, properties);
+        mgmtObject->set_remoteProperties(properties);
+        if (!procName.empty())
+            mgmtObject->set_remoteProcessName(procName);
+        if (pid != 0)
+            mgmtObject->set_remotePid(pid);
+        if (ppid != 0)
+            mgmtObject->set_remoteParentPid(ppid);
+    }
     try {
         authenticator->start(body.getMechanism(), body.hasResponse() ? &body.getResponse() : 0);
     } catch (std::exception& /*e*/) {
@@ -160,8 +182,9 @@ void ConnectionHandler::Handler::startOk
             string uid;
             authenticator->getError(error);
             authenticator->getUid(uid);
-            if (agent) {
-                agent->raiseEvent(_qmf::EventClientConnectFail(connection.getMgmtId(), uid, error));
+            if (agent && mgmtObject) {
+                agent->raiseEvent(_qmf::EventClientConnectFail(connection.getMgmtId(), uid, error,
+                                                               mgmtObject->get_remoteProperties()));
             }
             QPID_LOG_CAT(debug, model, "Failed connection. rhost:" << connection.getMgmtId()
                 << " user:" << uid
@@ -169,9 +192,8 @@ void ConnectionHandler::Handler::startOk
         }
         throw;
     }
-    const framing::FieldTable& clientProperties = body.getClientProperties();
-    connection.setClientProperties(clientProperties);
 
+    connection.setClientProperties(clientProperties);
     connection.setFederationLink(clientProperties.get(QPID_FED_LINK));
     if (clientProperties.isSet(QPID_FED_TAG)) {
         connection.setFederationPeerTag(clientProperties.getAsString(QPID_FED_TAG));
@@ -187,19 +209,6 @@ void ConnectionHandler::Handler::startOk
         }
         QPID_LOG(info, "Connection is a federation link");
     }
-
-    if (connection.getMgmtObject() != 0) {
-        string procName = clientProperties.getAsString(CLIENT_PROCESS_NAME);
-        uint32_t pid = clientProperties.getAsInt(CLIENT_PID);
-        uint32_t ppid = clientProperties.getAsInt(CLIENT_PPID);
-
-        if (!procName.empty())
-            connection.getMgmtObject()->set_remoteProcessName(procName);
-        if (pid != 0)
-            connection.getMgmtObject()->set_remotePid(pid);
-        if (ppid != 0)
-            connection.getMgmtObject()->set_remoteParentPid(ppid);
-    }
 }
 
 void ConnectionHandler::Handler::secureOk(const string& response)
@@ -216,8 +225,9 @@ void ConnectionHandler::Handler::secureO
             string uid;
             authenticator->getError(error);
             authenticator->getUid(uid);
-            if (agent) {
-                agent->raiseEvent(_qmf::EventClientConnectFail(connection.getMgmtId(), uid, error));
+            if (agent && connection.getMgmtObject()) {
+                agent->raiseEvent(_qmf::EventClientConnectFail(connection.getMgmtId(), uid, error,
+                                                               connection.getMgmtObject()->get_remoteProperties()));
             }
             QPID_LOG_CAT(debug, model, "Failed connection. rhost:" << connection.getMgmtId()
                 << " user:" << uid

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/ConnectionState.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/ConnectionState.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/ConnectionState.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/ConnectionState.h Thu Feb 28 16:14:30 2013
@@ -21,18 +21,22 @@
 #ifndef _ConnectionState_
 #define _ConnectionState_
 
-#include <vector>
-
+#include "qpid/broker/ConnectionToken.h"
 #include "qpid/sys/AggregateOutput.h"
 #include "qpid/sys/ConnectionOutputHandlerPtr.h"
 #include "qpid/framing/ProtocolVersion.h"
 #include "qpid/management/Manageable.h"
 #include "qpid/Url.h"
-#include "qpid/broker/Broker.h"
+
+#include <boost/function.hpp>
+#include <vector>
+
 
 namespace qpid {
 namespace broker {
 
+class Broker;
+
 class ConnectionState : public ConnectionToken, public management::Manageable
 {
   protected:
@@ -46,9 +50,8 @@ class ConnectionState : public Connectio
         framemax(65535),
         heartbeat(0),
         heartbeatmax(120),
-        userProxyAuth(false), // Can proxy msgs with non-matching auth ids when true (used by federation links & clustering)
+        userProxyAuth(false), // Can proxy msgs with non-matching auth ids when true (used by federation links)
         federationLink(true),
-        clusterOrderOut(0),
         isDefaultRealm(false)
     {}
 
@@ -62,14 +65,7 @@ class ConnectionState : public Connectio
     void setHeartbeat(uint16_t hb) { heartbeat = hb; }
     void setHeartbeatMax(uint16_t hbm) { heartbeatmax = hbm; }
 
-    virtual void setUserId(const std::string& uid) {
-        userId = uid;
-        size_t at = userId.find('@');
-        userName = userId.substr(0, at);
-        isDefaultRealm = (
-            at!= std::string::npos &&
-            getBroker().getOptions().realm == userId.substr(at+1,userId.size()));
-    }
+    virtual void setUserId(const std::string& uid);
 
     const std::string& getUserId() const { return userId; }
 
@@ -102,15 +98,6 @@ class ConnectionState : public Connectio
     framing::ProtocolVersion getVersion() const { return version; }
     void setOutputHandler(qpid::sys::ConnectionOutputHandler* o) { out.set(o); }
 
-    /**
-     * If the broker is part of a cluster, this is a handler provided
-     * by cluster code. It ensures consistent ordering of commands
-     * that are sent based on criteria that are not predictably
-     * ordered cluster-wide, e.g. a timer firing.
-     */
-    framing::FrameHandler* getClusterOrderOutput() { return clusterOrderOut; }
-    void setClusterOrderOutput(framing::FrameHandler& fh) { clusterOrderOut = &fh; }
-
     virtual void requestIOProcessing (boost::function0<void>) = 0;
 
   protected:
@@ -124,7 +111,6 @@ class ConnectionState : public Connectio
     bool federationLink;
     std::string federationPeerTag;
     std::vector<Url> knownHosts;
-    framing::FrameHandler* clusterOrderOut;
     std::string userName;
     bool isDefaultRealm;
 };

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Consumer.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Consumer.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Consumer.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Consumer.h Thu Feb 28 16:14:30 2013
@@ -79,6 +79,15 @@ class Consumer : public QueueCursor {
      */
     virtual bool hideDeletedError() { return false; }
 
+    /** If false, the consumer is not counted for purposes of auto-deletion or
+     * immediate messages. This is used for "system" consumers that are created
+     * by the broker for internal purposes as opposed to consumers that are
+     * created by normal clients.
+     */
+    virtual bool isCounted() { return true; }
+
+    QueueCursor getCursor() const { return *this; }
+    void setCursor(const QueueCursor& qc) { static_cast<QueueCursor&>(*this) = qc; }
   protected:
     //framing::SequenceNumber position;
 

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/ConsumerFactory.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/ConsumerFactory.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/ConsumerFactory.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/ConsumerFactory.h Thu Feb 28 16:14:30 2013
@@ -25,11 +25,14 @@
 // TODO aconway 2011-11-25: it's ugly exposing SemanticState::ConsumerImpl in public.
 // Refactor to use a more abstract interface.
 
-#include "qpid/broker/SemanticState.h"
+#include <boost/shared_ptr.hpp>
 
 namespace qpid {
 namespace broker {
 
+class SemanticState;
+class SemanticStateConsumerImpl;
+
 /**
  * Base class for consumer factoires. Plugins can register a
  * ConsumerFactory via Broker:: getConsumerFactories() Each time a
@@ -41,7 +44,7 @@ class ConsumerFactory
   public:
     virtual ~ConsumerFactory() {}
 
-    virtual boost::shared_ptr<SemanticState::ConsumerImpl> create(
+    virtual boost::shared_ptr<SemanticStateConsumerImpl> create(
         SemanticState* parent,
         const std::string& name, boost::shared_ptr<Queue> queue,
         bool ack, bool acquire, bool exclusive, const std::string& tag,

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/DeliverableMessage.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/DeliverableMessage.cpp?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/DeliverableMessage.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/DeliverableMessage.cpp Thu Feb 28 16:14:30 2013
@@ -36,8 +36,3 @@ Message& DeliverableMessage::getMessage(
 {
     return msg;
 }
-
-uint64_t DeliverableMessage::contentSize()
-{
-    return msg.getContentSize();
-}

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/DeliverableMessage.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/DeliverableMessage.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/DeliverableMessage.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/DeliverableMessage.h Thu Feb 28 16:14:30 2013
@@ -36,7 +36,6 @@ namespace qpid {
             QPID_BROKER_EXTERN DeliverableMessage(const Message& msg, TxBuffer* txn);
             QPID_BROKER_EXTERN virtual void deliverTo(const boost::shared_ptr<Queue>& queue);
             QPID_BROKER_EXTERN Message& getMessage();
-            QPID_BROKER_EXTERN uint64_t contentSize();
             virtual ~DeliverableMessage(){}
         };
     }

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/DirectExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/DirectExchange.cpp?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/DirectExchange.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/DirectExchange.cpp Thu Feb 28 16:14:30 2013
@@ -70,7 +70,7 @@ bool DirectExchange::bind(Queue::shared_
 
     if (args == 0 || fedOp.empty() || fedOp == fedOpBind) {
         Mutex::ScopedLock l(lock);
-        Binding::shared_ptr b(new Binding(routingKey, queue, this, FieldTable(), fedOrigin));
+        Binding::shared_ptr b(new Binding(routingKey, queue, this, args ? *args : FieldTable(), fedOrigin));
         BoundKey& bk = bindings[routingKey];
         if (exclusiveBinding) bk.queues.clear();
 

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/DtxManager.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/DtxManager.cpp?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/DtxManager.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/DtxManager.cpp Thu Feb 28 16:14:30 2013
@@ -27,6 +27,9 @@
 #include "qpid/ptr_map.h"
 
 #include <boost/format.hpp>
+#include <boost/bind.hpp>
+#include <boost/function.hpp>
+
 #include <iostream>
 
 using boost::intrusive_ptr;
@@ -35,6 +38,30 @@ using qpid::ptr_map_ptr;
 using namespace qpid::broker;
 using namespace qpid::framing;
 
+namespace {
+    typedef boost::function0<void> FireFunction;
+    struct DtxCleanup : public qpid::sys::TimerTask
+    {
+        FireFunction fireFunction;
+
+        DtxCleanup(uint32_t timeout, FireFunction f);
+        void fire();
+    };
+
+    DtxCleanup::DtxCleanup(uint32_t _timeout, FireFunction f)
+    : TimerTask(qpid::sys::Duration(_timeout * qpid::sys::TIME_SEC),"DtxCleanup"), fireFunction(f){}
+
+    void DtxCleanup::fire()
+    {
+        try {
+            fireFunction();
+        } catch (qpid::ConnectionException& /*e*/) {
+            //assume it was explicitly cleaned up after a call to prepare, commit or rollback
+        }
+    }
+
+}
+
 //DtxManager::DtxManager(qpid::sys::Timer& t) : store(0), timer(&t) {}
 DtxManager::DtxManager(qpid::sys::Timer& t) : asyncTxnStore(0), timer(&t) {}
 
@@ -158,19 +185,7 @@ void DtxManager::timedout(const std::str
     } else {
         ptr_map_ptr(i)->timedout();
         //TODO: do we want to have a timed task to cleanup, or can we rely on an explicit completion?
-        //timer.add(intrusive_ptr<TimerTask>(new DtxCleanup(60*30/*30 mins*/, *this, xid)));
-    }
-}
-
-DtxManager::DtxCleanup::DtxCleanup(uint32_t _timeout, DtxManager& _mgr, const std::string& _xid)
-    : TimerTask(qpid::sys::Duration(_timeout * qpid::sys::TIME_SEC),"DtxCleanup"), mgr(_mgr), xid(_xid) {}
-
-void DtxManager::DtxCleanup::fire()
-{
-    try {
-        mgr.remove(xid);
-    } catch (ConnectionException& /*e*/) {
-        //assume it was explicitly cleaned up after a call to prepare, commit or rollback
+        //timer->add(new DtxCleanup(60*30/*30 mins*/, boost::bind(&DtxManager::remove, this, xid)));
     }
 }
 

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/DtxManager.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/DtxManager.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/DtxManager.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/DtxManager.h Thu Feb 28 16:14:30 2013
@@ -32,20 +32,15 @@
 #include "qpid/ptr_map.h"
 
 namespace qpid {
+namespace sys {
+class Timer;
+}
+
 namespace broker {
 
 class DtxManager{
     typedef boost::ptr_map<std::string, DtxWorkRecord> WorkMap;
 
-    struct DtxCleanup : public sys::TimerTask
-    {
-        DtxManager& mgr;
-        const std::string& xid;
-
-        DtxCleanup(uint32_t timeout, DtxManager& mgr, const std::string& xid);
-        void fire();
-    };
-
     WorkMap work;
 //    TransactionalStore* store;
     AsyncTransactionalStore* asyncTxnStore;
@@ -71,11 +66,6 @@ public:
     void setStore(AsyncTransactionalStore* const ats);
     void setTimer(sys::Timer& t) { timer = &t; }
 
-    // Used by cluster for replication.
-    template<class F> void each(F f) const {
-        for (WorkMap::const_iterator i = work.begin(); i != work.end(); ++i)
-            f(*ptr_map_ptr(i));
-    }
     DtxWorkRecord* getWork(const std::string& xid);
     bool exists(const std::string& xid);
     static std::string convert(const framing::Xid& xid);

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/DtxWorkRecord.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/DtxWorkRecord.cpp?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/DtxWorkRecord.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/DtxWorkRecord.cpp Thu Feb 28 16:14:30 2013
@@ -20,7 +20,10 @@
  */
 #include "qpid/broker/DtxWorkRecord.h"
 #include "qpid/broker/DtxManager.h"
+#include "qpid/broker/DtxTimeout.h"
 #include "qpid/framing/reply_exceptions.h"
+#include "qpid/sys/Timer.h"
+
 #include <boost/format.hpp>
 #include <boost/mem_fn.hpp>
 using boost::mem_fn;
@@ -39,6 +42,12 @@ DtxWorkRecord::~DtxWorkRecord()
     }
 }
 
+void DtxWorkRecord::setTimeout(boost::intrusive_ptr<DtxTimeout> t)
+{ timeout = t; }
+
+boost::intrusive_ptr<DtxTimeout> DtxWorkRecord::getTimeout()
+{ return timeout; }
+
 bool DtxWorkRecord::prepare()
 {
     Mutex::ScopedLock locker(lock);
@@ -182,17 +191,3 @@ void DtxWorkRecord::timedout()
     }
     abort();
 }
-
-size_t DtxWorkRecord::indexOf(const DtxBuffer::shared_ptr& buf) {
-    Work::iterator i = std::find(work.begin(), work.end(), buf);
-    if (i == work.end()) throw NotFoundException(
-        QPID_MSG("Can't find DTX buffer for xid: " << buf->getXid()));
-    return i - work.begin();
-}
-
-DtxBuffer::shared_ptr DtxWorkRecord::operator[](size_t i) const {
-    if (i > work.size())
-        throw NotFoundException(
-            QPID_MSG("Can't find DTX buffer " << i << " for xid: " << xid));
-    return work[i];
-}

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/DtxWorkRecord.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/DtxWorkRecord.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/DtxWorkRecord.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/DtxWorkRecord.h Thu Feb 28 16:14:30 2013
@@ -24,7 +24,6 @@
 #include "qpid/broker/AsyncStore.h"
 #include "qpid/broker/BrokerImportExport.h"
 #include "qpid/broker/DtxBuffer.h"
-#include "qpid/broker/DtxTimeout.h"
 #include "qpid/broker/TransactionalStore.h"
 
 #include "qpid/framing/amqp_types.h"
@@ -39,6 +38,8 @@
 namespace qpid {
 namespace broker {
 
+struct DtxTimeout;
+
 /**
  * Represents the work done under a particular distributed transaction
  * across potentially multiple channels. Identified by a xid. Allows
@@ -74,19 +75,13 @@ public:
     QPID_BROKER_EXTERN void add(DtxBuffer::shared_ptr ops);
     void recover(std::auto_ptr<TPCTransactionContext> txn, DtxBuffer::shared_ptr ops);
     void timedout();
-    void setTimeout(boost::intrusive_ptr<DtxTimeout> t) { timeout = t; }
-    boost::intrusive_ptr<DtxTimeout> getTimeout() { return timeout; }
+    void setTimeout(boost::intrusive_ptr<DtxTimeout> t);
+    boost::intrusive_ptr<DtxTimeout> getTimeout();
     std::string getXid() const { return xid; }
     bool isCompleted() const { return completed; }
     bool isRolledback() const { return rolledback; }
     bool isPrepared() const { return prepared; }
     bool isExpired() const { return expired; }
-
-    // Used by cluster update;
-    size_t size() const { return work.size(); }
-    DtxBuffer::shared_ptr operator[](size_t i) const;
-    uint32_t getTimeout() const { return timeout? timeout->timeout : 0; }
-    size_t indexOf(const DtxBuffer::shared_ptr&);
 };
 
 }} // qpid::broker

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Exchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Exchange.cpp?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Exchange.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Exchange.cpp Thu Feb 28 16:14:30 2013
@@ -167,19 +167,19 @@ void Exchange::routeIVE(){
 
 Exchange::Exchange (const string& _name, Manageable* parent, Broker* b) :
     name(_name), durable(false), alternateUsers(0), persistenceId(0), sequence(false),
-    sequenceNo(0), ive(false), mgmtExchange(0), brokerMgmtObject(0), broker(b), destroyed(false)
+    sequenceNo(0), ive(false), broker(b), destroyed(false)
 {
     if (parent != 0 && broker != 0)
     {
         ManagementAgent* agent = broker->getManagementAgent();
         if (agent != 0)
         {
-            mgmtExchange = new _qmf::Exchange (agent, this, parent, _name);
+            mgmtExchange = _qmf::Exchange::shared_ptr(new _qmf::Exchange (agent, this, parent, _name));
             mgmtExchange->set_durable(durable);
             mgmtExchange->set_autoDelete(false);
             agent->addObject(mgmtExchange, 0, durable);
             if (broker)
-                brokerMgmtObject = (qmf::org::apache::qpid::broker::Broker*) broker->GetManagementObject();
+                brokerMgmtObject = boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(broker->GetManagementObject());
         }
     }
 }
@@ -187,20 +187,20 @@ Exchange::Exchange (const string& _name,
 Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args,
                    Manageable* parent, Broker* b)
     : name(_name), durable(_durable), alternateUsers(0), persistenceId(0),
-      args(_args), sequence(false), sequenceNo(0), ive(false), mgmtExchange(0), brokerMgmtObject(0), broker(b), destroyed(false)
+      args(_args), sequence(false), sequenceNo(0), ive(false), broker(b), destroyed(false)
 {
     if (parent != 0 && broker != 0)
     {
         ManagementAgent* agent = broker->getManagementAgent();
         if (agent != 0)
         {
-            mgmtExchange = new _qmf::Exchange (agent, this, parent, _name);
+            mgmtExchange = _qmf::Exchange::shared_ptr(new _qmf::Exchange (agent, this, parent, _name));
             mgmtExchange->set_durable(durable);
             mgmtExchange->set_autoDelete(false);
             mgmtExchange->set_arguments(ManagementAgent::toMap(args));
             agent->addObject(mgmtExchange, 0, durable);
             if (broker)
-                brokerMgmtObject = (qmf::org::apache::qpid::broker::Broker*) broker->GetManagementObject();
+                brokerMgmtObject = boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(broker->GetManagementObject());
         }
     }
 
@@ -212,8 +212,6 @@ Exchange::Exchange(const string& _name, 
 
     ive = _args.get(qpidIVE);
     if (ive) {
-        if (broker && broker->isInCluster())
-            throw framing::NotImplementedException("Cannot use Initial Value Exchanges in a cluster");
         QPID_LOG(debug, "Configured exchange " <<  _name  << " with Initial Value");
     }
 }
@@ -227,6 +225,7 @@ Exchange::~Exchange ()
 void Exchange::setAlternate(Exchange::shared_ptr _alternate)
 {
     alternate = _alternate;
+    alternate->incAlternateUsers();
     if (mgmtExchange != 0) {
         if (alternate.get() != 0)
             mgmtExchange->set_altExchange(alternate->GetManagementObject()->getObjectId());
@@ -296,9 +295,9 @@ void Exchange::recoveryComplete(Exchange
     }
 }
 
-ManagementObject* Exchange::GetManagementObject (void) const
+ManagementObject::shared_ptr Exchange::GetManagementObject (void) const
 {
-    return (ManagementObject*) mgmtExchange;
+    return mgmtExchange;
 }
 
 void Exchange::registerDynamicBridge(DynamicBridge* db, AsyncStore* const store)
@@ -347,16 +346,16 @@ void Exchange::propagateFedOp(const stri
 
 Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchange* _parent,
                            FieldTable _args, const string& _origin, ConfigHandle _cfgHandle)
-    : parent(_parent), queue(_queue), key(_key), args(_args), origin(_origin), cfgHandle(_cfgHandle), mgmtBinding(0)
+    : parent(_parent), queue(_queue), key(_key), args(_args), origin(_origin), cfgHandle(_cfgHandle)
 {
 }
 
 Exchange::Binding::~Binding ()
 {
     if (mgmtBinding != 0) {
-        ManagementObject* mo = queue->GetManagementObject();
+        _qmf::Queue::shared_ptr mo = boost::dynamic_pointer_cast<_qmf::Queue>(queue->GetManagementObject());
         if (mo != 0)
-            static_cast<_qmf::Queue*>(mo)->dec_bindingCount();
+            mo->dec_bindingCount();
         mgmtBinding->resourceDestroy ();
     }
 }
@@ -369,25 +368,25 @@ void Exchange::Binding::startManagement(
         if (broker != 0) {
             ManagementAgent* agent = broker->getManagementAgent();
             if (agent != 0) {
-                ManagementObject* mo = queue->GetManagementObject();
+                _qmf::Queue::shared_ptr mo = boost::dynamic_pointer_cast<_qmf::Queue>(queue->GetManagementObject());
                 if (mo != 0) {
                     management::ObjectId queueId = mo->getObjectId();
 
-                    mgmtBinding = new _qmf::Binding
-                        (agent, this, (Manageable*) parent, queueId, key, ManagementAgent::toMap(args));
+                    mgmtBinding = _qmf::Binding::shared_ptr(new _qmf::Binding
+                        (agent, this, (Manageable*) parent, queueId, key, ManagementAgent::toMap(args)));
                     if (!origin.empty())
                         mgmtBinding->set_origin(origin);
                     agent->addObject(mgmtBinding);
-                    static_cast<_qmf::Queue*>(mo)->inc_bindingCount();
+                    mo->inc_bindingCount();
                 }
             }
         }
     }
 }
 
-ManagementObject* Exchange::Binding::GetManagementObject () const
+ManagementObject::shared_ptr Exchange::Binding::GetManagementObject () const
 {
-    return (ManagementObject*) mgmtBinding;
+    return mgmtBinding;
 }
 
 uint64_t Exchange::Binding::getSize() { return 0; } // TODO: kpvdr: implement persistence
@@ -434,5 +433,10 @@ bool Exchange::routeWithAlternate(Delive
     return msg.delivered;
 }
 
+void Exchange::setArgs(const framing::FieldTable& newArgs) {
+    args = newArgs;
+    if (mgmtExchange) mgmtExchange->set_arguments(ManagementAgent::toMap(args));
+}
+
 }}
 

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Exchange.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Exchange.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Exchange.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Exchange.h Thu Feb 28 16:14:30 2013
@@ -55,14 +55,14 @@ public:
         const framing::FieldTable args;
         std::string               origin;
         ConfigHandle              cfgHandle;
-        qmf::org::apache::qpid::broker::Binding* mgmtBinding;
+        qmf::org::apache::qpid::broker::Binding::shared_ptr mgmtBinding;
 
         Binding(const std::string& key, boost::shared_ptr<Queue> queue, Exchange* parent = 0,
                 framing::FieldTable args = framing::FieldTable(), const std::string& origin = std::string(),
                 ConfigHandle cfgHandle = ConfigHandle());
         ~Binding();
         void startManagement();
-        management::ManagementObject* GetManagementObject() const;
+        management::ManagementObject::shared_ptr GetManagementObject() const;
 
         // DataSource implementation - allows for persistence
         uint64_t getSize();
@@ -170,8 +170,8 @@ protected:
         }
     };
 
-    qmf::org::apache::qpid::broker::Exchange* mgmtExchange;
-    qmf::org::apache::qpid::broker::Broker* brokerMgmtObject;
+    qmf::org::apache::qpid::broker::Exchange::shared_ptr mgmtExchange;
+    qmf::org::apache::qpid::broker::Broker::shared_ptr brokerMgmtObject;
 
 public:
     typedef boost::shared_ptr<Exchange> shared_ptr;
@@ -184,7 +184,8 @@ public:
 
     const std::string& getName() const { return name; }
     bool isDurable() { return durable; }
-    qpid::framing::FieldTable& getArgs() { return args; }
+    QPID_BROKER_EXTERN const qpid::framing::FieldTable& getArgs() const { return args; }
+    QPID_BROKER_EXTERN void setArgs(const framing::FieldTable&);
 
     QPID_BROKER_EXTERN Exchange::shared_ptr getAlternate() { return alternate; }
     QPID_BROKER_EXTERN void setAlternate(Exchange::shared_ptr _alternate);
@@ -221,7 +222,7 @@ public:
     static QPID_BROKER_EXTERN Exchange::shared_ptr decode(ExchangeRegistry& exchanges, framing::Buffer& buffer);
 
     // Manageable entry points
-    QPID_BROKER_EXTERN management::ManagementObject* GetManagementObject(void) const;
+    QPID_BROKER_EXTERN management::ManagementObject::shared_ptr GetManagementObject(void) const;
 
     // Federation hooks
     class DynamicBridge {

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/ExchangeRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/ExchangeRegistry.cpp?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/ExchangeRegistry.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/ExchangeRegistry.cpp Thu Feb 28 16:14:30 2013
@@ -29,20 +29,26 @@
 #include "qpid/management/ManagementDirectExchange.h"
 #include "qpid/management/ManagementTopicExchange.h"
 #include "qpid/framing/reply_exceptions.h"
+#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
+#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
 
 using namespace qpid::broker;
 using namespace qpid::sys;
 using std::pair;
 using std::string;
 using qpid::framing::FieldTable;
+using qpid::management::ManagementAgent;
+namespace _qmf = qmf::org::apache::qpid::broker;
 
 pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type){
 
     return declare(name, type, false, FieldTable());
 }
 
-pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type,
-                                                           bool durable, const FieldTable& args){
+pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(
+    const string& name, const string& type, bool durable, const FieldTable& args,
+    Exchange::shared_ptr alternate, const string& connectionId, const string& userId)
+{
     Exchange::shared_ptr exchange;
     std::pair<Exchange::shared_ptr, bool> result;
     {
@@ -73,31 +79,55 @@ pair<Exchange::shared_ptr, bool> Exchang
             }
             exchanges[name] = exchange;
             result = std::pair<Exchange::shared_ptr, bool>(exchange, true);
+            if (alternate) exchange->setAlternate(alternate);
+            // Call exchangeCreate inside the lock to ensure correct ordering.
+            if (broker) broker->getConfigurationObservers().exchangeCreate(exchange);
         } else {
             result = std::pair<Exchange::shared_ptr, bool>(i->second, false);
         }
+        if (broker && broker->getManagementAgent()) {
+            // Call raiseEvent inside the lock to ensure correct ordering.
+            broker->getManagementAgent()->raiseEvent(
+                _qmf::EventExchangeDeclare(
+                    connectionId,
+                    userId,
+                    name,
+                    type,
+                    alternate ? alternate->getName() : string(),
+                    durable,
+                    false,
+                    ManagementAgent::toMap(result.first->getArgs()),
+                    result.second ? "created" : "existing"));
+        }
     }
-    if (broker && exchange) broker->getConfigurationObservers().exchangeCreate(exchange);
     return result;
 }
 
-void ExchangeRegistry::destroy(const string& name){
+void ExchangeRegistry::destroy(
+    const string& name, const string& connectionId, const string& userId)
+{
     if (name.empty() ||
         (name.find("amq.") == 0 &&
          (name == "amq.direct" || name == "amq.fanout" || name == "amq.topic" || name == "amq.match")) ||
         name == "qpid.management")
         throw framing::NotAllowedException(QPID_MSG("Cannot delete default exchange: '" << name << "'"));
-    Exchange::shared_ptr exchange;
     {
         RWlock::ScopedWlock locker(lock);
         ExchangeMap::iterator i =  exchanges.find(name);
         if (i != exchanges.end()) {
-            exchange = i->second;
+            if (broker) {
+                // Call exchangeDestroy and raiseEvent inside the lock to ensure
+                // correct ordering.
+                broker->getConfigurationObservers().exchangeDestroy(i->second);
+                if (broker->getManagementAgent())
+                    broker->getManagementAgent()->raiseEvent(
+                        _qmf::EventExchangeDelete(connectionId, userId, name));
+            }
             i->second->destroy();
             exchanges.erase(i);
+
         }
     }
-    if (broker && exchange) broker->getConfigurationObservers().exchangeDestroy(exchange);
 }
 
 Exchange::shared_ptr ExchangeRegistry::find(const string& name){

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/ExchangeRegistry.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/ExchangeRegistry.h?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/ExchangeRegistry.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/ExchangeRegistry.h Thu Feb 28 16:14:30 2013
@@ -46,14 +46,23 @@ class ExchangeRegistry{
                              bool, const qpid::framing::FieldTable&, qpid::management::Manageable*, qpid::broker::Broker*> FactoryFunction;
 
     ExchangeRegistry (Broker* b = 0) : parent(0), broker(b) {}
-    QPID_BROKER_EXTERN std::pair<Exchange::shared_ptr, bool> declare
-      (const std::string& name, const std::string& type);
-    QPID_BROKER_EXTERN std::pair<Exchange::shared_ptr, bool> declare
-      (const std::string& name,
-       const std::string& type, 
-       bool durable,
-       const qpid::framing::FieldTable& args = framing::FieldTable());
-    QPID_BROKER_EXTERN void destroy(const std::string& name);
+    QPID_BROKER_EXTERN std::pair<Exchange::shared_ptr, bool> declare(
+        const std::string& name, const std::string& type);
+
+    QPID_BROKER_EXTERN std::pair<Exchange::shared_ptr, bool> declare(
+        const std::string& name,
+        const std::string& type,
+        bool durable,
+        const qpid::framing::FieldTable& args = framing::FieldTable(),
+        Exchange::shared_ptr alternate = Exchange::shared_ptr(),
+        const std::string& connectionId = std::string(),
+        const std::string& userId = std::string());
+
+    QPID_BROKER_EXTERN void destroy(
+        const std::string& name,
+        const std::string& connectionId = std::string(),
+        const std::string& userId = std::string());
+
     QPID_BROKER_EXTERN Exchange::shared_ptr getDefault();
 
     /**

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/FanOutExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/FanOutExchange.cpp?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/FanOutExchange.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/FanOutExchange.cpp Thu Feb 28 16:14:30 2013
@@ -54,7 +54,7 @@ bool FanOutExchange::bind(Queue::shared_
     bool propagate = false;
 
     if (args == 0 || fedOp.empty() || fedOp == fedOpBind) {
-        Binding::shared_ptr binding (new Binding ("", queue, this, FieldTable(), fedOrigin));
+        Binding::shared_ptr binding (new Binding ("", queue, this, args ? *args : FieldTable(), fedOrigin));
         if (bindings.add_unless(binding, MatchQueue(queue))) {
             binding->startManagement();
             propagate = fedBinding.addOrigin(queue->getName(), fedOrigin);



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org