You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/21 16:42:51 UTC

svn commit: r1187375 [7/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/ cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2/python/ cpp/bindings/qmf2...

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/Broker.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/Broker.cpp Fri Oct 21 14:42:12 2011
@@ -20,6 +20,7 @@
  */
 
 #include "qpid/broker/Broker.h"
+#include "qpid/broker/ConnectionState.h"
 #include "qpid/broker/DirectExchange.h"
 #include "qpid/broker/FanOutExchange.h"
 #include "qpid/broker/HeadersExchange.h"
@@ -31,12 +32,26 @@
 #include "qpid/broker/TopicExchange.h"
 #include "qpid/broker/Link.h"
 #include "qpid/broker/ExpiryPolicy.h"
+#include "qpid/broker/QueueFlowLimit.h"
+#include "qpid/broker/MessageGroupManager.h"
 
 #include "qmf/org/apache/qpid/broker/Package.h"
+#include "qmf/org/apache/qpid/broker/ArgsBrokerCreate.h"
+#include "qmf/org/apache/qpid/broker/ArgsBrokerDelete.h"
+#include "qmf/org/apache/qpid/broker/ArgsBrokerQuery.h"
 #include "qmf/org/apache/qpid/broker/ArgsBrokerEcho.h"
 #include "qmf/org/apache/qpid/broker/ArgsBrokerGetLogLevel.h"
 #include "qmf/org/apache/qpid/broker/ArgsBrokerQueueMoveMessages.h"
 #include "qmf/org/apache/qpid/broker/ArgsBrokerSetLogLevel.h"
+#include "qmf/org/apache/qpid/broker/ArgsBrokerSetTimestampConfig.h"
+#include "qmf/org/apache/qpid/broker/ArgsBrokerGetTimestampConfig.h"
+#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
+#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
+#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h"
+#include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
+#include "qmf/org/apache/qpid/broker/EventBind.h"
+#include "qmf/org/apache/qpid/broker/EventUnbind.h"
+#include "qpid/amqp_0_10/Codecs.h"
 #include "qpid/management/ManagementDirectExchange.h"
 #include "qpid/management/ManagementTopicExchange.h"
 #include "qpid/log/Logger.h"
@@ -44,7 +59,9 @@
 #include "qpid/log/Statement.h"
 #include "qpid/log/posix/SinkOptions.h"
 #include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/FieldTable.h"
 #include "qpid/framing/ProtocolInitiation.h"
+#include "qpid/framing/reply_exceptions.h"
 #include "qpid/framing/Uuid.h"
 #include "qpid/sys/ProtocolFactory.h"
 #include "qpid/sys/Poller.h"
@@ -76,7 +93,10 @@ using qpid::management::ManagementAgent;
 using qpid::management::ManagementObject;
 using qpid::management::Manageable;
 using qpid::management::Args;
+using qpid::management::getManagementExecutionContext;
+using qpid::types::Variant;
 using std::string;
+using std::make_pair;
 
 namespace _qmf = qmf::org::apache::qpid::broker;
 
@@ -103,7 +123,12 @@ Broker::Options::Options(const std::stri
     maxSessionRate(0),
     asyncQueueEvents(false),     // Must be false in a cluster.
     qmf2Support(true),
-    qmf1Support(true)
+    qmf1Support(true),
+    queueFlowStopRatio(80),
+    queueFlowResumeRatio(70),
+    queueThresholdEventRatio(80),
+    defaultMsgGroup("qpid.no-group"),
+    timestampRcvMsgs(false)     // set the 0.10 timestamp delivery property
 {
     int c = sys::SystemInfo::concurrency();
     workerThreads=c+1;
@@ -134,9 +159,14 @@ Broker::Options::Options(const std::stri
         ("tcp-nodelay", optValue(tcpNoDelay), "Set TCP_NODELAY on TCP connections")
         ("require-encryption", optValue(requireEncrypted), "Only accept connections that are encrypted")
         ("known-hosts-url", optValue(knownHosts, "URL or 'none'"), "URL to send as 'known-hosts' to clients ('none' implies empty list)")
-        ("sasl-config", optValue(saslConfigPath, "FILE"), "gets sasl config from nonstandard location")
+        ("sasl-config", optValue(saslConfigPath, "DIR"), "gets sasl config info from nonstandard location")
         ("max-session-rate", optValue(maxSessionRate, "MESSAGES/S"), "Sets the maximum message rate per session (0=unlimited)")
-        ("async-queue-events", optValue(asyncQueueEvents, "yes|no"), "Set Queue Events async, used for services like replication");
+        ("async-queue-events", optValue(asyncQueueEvents, "yes|no"), "Set Queue Events async, used for services like replication")
+        ("default-flow-stop-threshold", optValue(queueFlowStopRatio, "PERCENT"), "Percent of queue's maximum capacity at which flow control is activated.")
+        ("default-flow-resume-threshold", optValue(queueFlowResumeRatio, "PERCENT"), "Percent of queue's maximum capacity at which flow control is de-activated.")
+        ("default-event-threshold-ratio", optValue(queueThresholdEventRatio, "%age of limit"), "The ratio of any specified queue limit at which an event will be raised")
+        ("default-message-group", optValue(defaultMsgGroup, "GROUP-IDENTIFER"), "Group identifier to assign to messages delivered to a message group queue that do not contain an identifier.")
+        ("enable-timestamp", optValue(timestampRcvMsgs, "yes|no"), "Add current time to each received message.");
 }
 
 const std::string empty;
@@ -166,9 +196,10 @@ Broker::Broker(const Broker::Options& co
             conf.replayFlushLimit*1024, // convert kb to bytes.
             conf.replayHardLimit*1024),
         *this),
-    queueCleaner(queues, timer),
-    queueEvents(poller,!conf.asyncQueueEvents), 
+    queueCleaner(queues, &timer),
+    queueEvents(poller,!conf.asyncQueueEvents),
     recovery(true),
+    inCluster(false),
     clusterUpdatee(false),
     expiryPolicy(new ExpiryPolicy),
     connectionCounter(conf.maxConnections),
@@ -225,8 +256,11 @@ Broker::Broker(const Broker::Options& co
     // Early-Initialize plugins
     Plugin::earlyInitAll(*this);
 
+    QueueFlowLimit::setDefaults(conf.queueLimit, conf.queueFlowStopRatio, conf.queueFlowResumeRatio);
+    MessageGroupManager::setDefaults(conf.defaultMsgGroup);
+
     // If no plugin store module registered itself, set up the null store.
-    if (NullMessageStore::isNullStore(store.get())) 
+    if (NullMessageStore::isNullStore(store.get()))
         setStore();
 
     exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
@@ -271,6 +305,11 @@ Broker::Broker(const Broker::Options& co
     else
         QPID_LOG(info, "Management not enabled");
 
+    // this feature affects performance, so let's be sure that gets logged!
+    if (conf.timestampRcvMsgs) {
+        QPID_LOG(notice, "Receive message timestamping is ENABLED.");
+    }
+
     /**
      * SASL setup, can fail and terminate startup
      */
@@ -345,14 +384,14 @@ void Broker::run() {
         Dispatcher d(poller);
         int numIOThreads = config.workerThreads;
         std::vector<Thread> t(numIOThreads-1);
-        
+
         // Run n-1 io threads
         for (int i=0; i<numIOThreads-1; ++i)
             t[i] = Thread(d);
-        
+
         // Run final thread
         d.run();
-        
+
         // Now wait for n-1 io threads to exit
         for (int i=0; i<numIOThreads-1; ++i) {
             t[i].join();
@@ -399,9 +438,9 @@ Manageable::status_t Broker::ManagementM
     {
     case _qmf::Broker::METHOD_ECHO :
         QPID_LOG (debug, "Broker::echo("
-                  << dynamic_cast<_qmf::ArgsBrokerEcho&>(args).io_sequence 
-                  << ", " 
-                  << dynamic_cast<_qmf::ArgsBrokerEcho&>(args).io_body 
+                  << dynamic_cast<_qmf::ArgsBrokerEcho&>(args).io_sequence
+                  << ", "
+                  << dynamic_cast<_qmf::ArgsBrokerEcho&>(args).io_body
                   << ")");
         status = Manageable::STATUS_OK;
         break;
@@ -409,8 +448,9 @@ Manageable::status_t Broker::ManagementM
         _qmf::ArgsBrokerConnect& hp=
             dynamic_cast<_qmf::ArgsBrokerConnect&>(args);
 
-        QPID_LOG (debug, "Broker::connect()");
         string transport = hp.i_transport.empty() ? TCP_TRANSPORT : hp.i_transport;
+        QPID_LOG (debug, "Broker::connect() " << hp.i_host << ":" << hp.i_port << "; transport=" << transport <<
+                        "; durable=" << (hp.i_durable?"T":"F") << "; authMech=\"" << hp.i_authMechanism << "\"");
         if (!getProtocolFactory(transport)) {
             QPID_LOG(error, "Transport '" << transport << "' not supported");
             return  Manageable::STATUS_NOT_IMPLEMENTED;
@@ -427,9 +467,9 @@ Manageable::status_t Broker::ManagementM
         _qmf::ArgsBrokerQueueMoveMessages& moveArgs=
             dynamic_cast<_qmf::ArgsBrokerQueueMoveMessages&>(args);
         QPID_LOG (debug, "Broker::queueMoveMessages()");
-	if (queueMoveMessages(moveArgs.i_srcQueue, moveArgs.i_destQueue, moveArgs.i_qty))
+        if (queueMoveMessages(moveArgs.i_srcQueue, moveArgs.i_destQueue, moveArgs.i_qty, moveArgs.i_filter))
             status = Manageable::STATUS_OK;
-	else
+        else
             return Manageable::STATUS_PARAMETER_INVALID;
         break;
       }
@@ -443,6 +483,38 @@ Manageable::status_t Broker::ManagementM
         QPID_LOG (debug, "Broker::getLogLevel()");
         status = Manageable::STATUS_OK;
         break;
+    case _qmf::Broker::METHOD_CREATE :
+      {
+          _qmf::ArgsBrokerCreate& a = dynamic_cast<_qmf::ArgsBrokerCreate&>(args);
+          createObject(a.i_type, a.i_name, a.i_properties, a.i_strict, getManagementExecutionContext());
+          status = Manageable::STATUS_OK;
+          break;
+      }
+    case _qmf::Broker::METHOD_DELETE :
+      {
+          _qmf::ArgsBrokerDelete& a = dynamic_cast<_qmf::ArgsBrokerDelete&>(args);
+          deleteObject(a.i_type, a.i_name, a.i_options, getManagementExecutionContext());
+          status = Manageable::STATUS_OK;
+          break;
+      }
+    case _qmf::Broker::METHOD_QUERY :
+      {
+          _qmf::ArgsBrokerQuery& a = dynamic_cast<_qmf::ArgsBrokerQuery&>(args);
+          status = queryObject(a.i_type, a.i_name, a.o_results, getManagementExecutionContext());
+          break;
+      }
+    case _qmf::Broker::METHOD_GETTIMESTAMPCONFIG:
+        {
+          _qmf::ArgsBrokerGetTimestampConfig& a = dynamic_cast<_qmf::ArgsBrokerGetTimestampConfig&>(args);
+          status = getTimestampConfig(a.o_receive, getManagementExecutionContext());
+          break;
+        }
+    case _qmf::Broker::METHOD_SETTIMESTAMPCONFIG:
+        {
+          _qmf::ArgsBrokerSetTimestampConfig& a = dynamic_cast<_qmf::ArgsBrokerSetTimestampConfig&>(args);
+          status = setTimestampConfig(a.i_receive, getManagementExecutionContext());
+          break;
+        }
    default:
         QPID_LOG (debug, "Broker ManagementMethod not implemented: id=" << methodId << "]");
         status = Manageable::STATUS_NOT_IMPLEMENTED;
@@ -452,6 +524,240 @@ Manageable::status_t Broker::ManagementM
     return status;
 }
 
+namespace
+{
+const std::string TYPE_QUEUE("queue");
+const std::string TYPE_EXCHANGE("exchange");
+const std::string TYPE_TOPIC("topic");
+const std::string TYPE_BINDING("binding");
+const std::string DURABLE("durable");
+const std::string AUTO_DELETE("auto-delete");
+const std::string ALTERNATE_EXCHANGE("alternate-exchange");
+const std::string EXCHANGE_TYPE("exchange-type");
+const std::string QUEUE_NAME("queue");
+const std::string EXCHANGE_NAME("exchange");
+
+const std::string ATTRIBUTE_TIMESTAMP_0_10("timestamp-0.10");
+
+const std::string _TRUE("true");
+const std::string _FALSE("false");
+}
+
+struct InvalidBindingIdentifier : public qpid::Exception
+{
+    InvalidBindingIdentifier(const std::string& name) : qpid::Exception(name) {}
+    std::string getPrefix() const { return "invalid binding"; }
+};
+
+struct BindingIdentifier
+{
+    std::string exchange;
+    std::string queue;
+    std::string key;
+
+    BindingIdentifier(const std::string& name)
+    {
+        std::vector<std::string> path;
+        split(path, name, "/");
+        switch (path.size()) {
+          case 1:
+            queue = path[0];
+            break;
+          case 2:
+            exchange = path[0];
+            queue = path[1];
+            break;
+          case 3:
+            exchange = path[0];
+            queue = path[1];
+            key = path[2];
+            break;
+          default:
+            throw InvalidBindingIdentifier(name);
+        }
+    }
+};
+
+struct ObjectAlreadyExists : public qpid::Exception
+{
+    ObjectAlreadyExists(const std::string& name) : qpid::Exception(name) {}
+    std::string getPrefix() const { return "object already exists"; }
+};
+
+struct UnknownObjectType : public qpid::Exception
+{
+    UnknownObjectType(const std::string& type) : qpid::Exception(type) {}
+    std::string getPrefix() const { return "unknown object type"; }
+};
+
+void Broker::createObject(const std::string& type, const std::string& name,
+                          const Variant::Map& properties, bool /*strict*/, const ConnectionState* context)
+{
+    std::string userId;
+    std::string connectionId;
+    if (context) {
+        userId = context->getUserId();
+        connectionId = context->getUrl();
+    }
+    //TODO: implement 'strict' option (check there are no unrecognised properties)
+    QPID_LOG (debug, "Broker::create(" << type << ", " << name << "," << properties << ")");
+    if (type == TYPE_QUEUE) {
+        bool durable(false);
+        bool autodelete(false);
+        std::string alternateExchange;
+        Variant::Map extensions;
+        for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) {
+            // extract durable, auto-delete and alternate-exchange properties
+            if (i->first == DURABLE) durable = i->second;
+            else if (i->first == AUTO_DELETE) autodelete = i->second;
+            else if (i->first == ALTERNATE_EXCHANGE) alternateExchange = i->second.asString();
+            //treat everything else as extension properties
+            else extensions[i->first] = i->second;
+        }
+        framing::FieldTable arguments;
+        amqp_0_10::translate(extensions, arguments);
+
+        std::pair<boost::shared_ptr<Queue>, bool> result =
+            createQueue(name, durable, autodelete, 0, alternateExchange, arguments, userId, connectionId);
+        if (!result.second) {
+            throw ObjectAlreadyExists(name);
+        }
+    } else if (type == TYPE_EXCHANGE || type == TYPE_TOPIC) {
+        bool durable(false);
+        std::string exchangeType("topic");
+        std::string alternateExchange;
+        Variant::Map extensions;
+        for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) {
+            // extract durable, auto-delete and alternate-exchange properties
+            if (i->first == DURABLE) durable = i->second;
+            else if (i->first == EXCHANGE_TYPE) exchangeType = i->second.asString();
+            else if (i->first == ALTERNATE_EXCHANGE) alternateExchange = i->second.asString();
+            //treat everything else as extension properties
+            else extensions[i->first] = i->second;
+        }
+        framing::FieldTable arguments;
+        amqp_0_10::translate(extensions, arguments);
+
+        try {
+            std::pair<boost::shared_ptr<Exchange>, bool> result =
+                createExchange(name, exchangeType, durable, alternateExchange, arguments, userId, connectionId);
+            if (!result.second) {
+                throw ObjectAlreadyExists(name);
+            }
+        } catch (const UnknownExchangeTypeException&) {
+            throw Exception(QPID_MSG("Invalid exchange type: " << exchangeType));
+        }
+    } else if (type == TYPE_BINDING) {
+        BindingIdentifier binding(name);
+        std::string exchangeType("topic");
+        Variant::Map extensions;
+        for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) {
+            // extract durable, auto-delete and alternate-exchange properties
+            if (i->first == EXCHANGE_TYPE) exchangeType = i->second.asString();
+            //treat everything else as extension properties
+            else extensions[i->first] = i->second;
+        }
+        framing::FieldTable arguments;
+        amqp_0_10::translate(extensions, arguments);
+
+        bind(binding.queue, binding.exchange, binding.key, arguments, userId, connectionId);
+    } else {
+        throw UnknownObjectType(type);
+    }
+}
+
+void Broker::deleteObject(const std::string& type, const std::string& name,
+                          const Variant::Map& options, const ConnectionState* context)
+{
+    std::string userId;
+    std::string connectionId;
+    if (context) {
+        userId = context->getUserId();
+        connectionId = context->getUrl();
+    }
+    QPID_LOG (debug, "Broker::delete(" << type << ", " << name << "," << options << ")");
+    if (type == TYPE_QUEUE) {
+        deleteQueue(name, userId, connectionId);
+    } else if (type == TYPE_EXCHANGE || type == TYPE_TOPIC) {
+        deleteExchange(name, userId, connectionId);
+    } else if (type == TYPE_BINDING) {
+        BindingIdentifier binding(name);
+        unbind(binding.queue, binding.exchange, binding.key, userId, connectionId);
+    } else {
+        throw UnknownObjectType(type);
+    }
+
+}
+
+Manageable::status_t Broker::queryObject(const std::string& type,
+                                         const std::string& name,
+                                         Variant::Map& results,
+                                         const ConnectionState* context)
+{
+    std::string userId;
+    std::string connectionId;
+    if (context) {
+        userId = context->getUserId();
+        connectionId = context->getUrl();
+    }
+    QPID_LOG (debug, "Broker::query(" << type << ", " << name << ")");
+
+    if (type == TYPE_QUEUE)
+        return queryQueue( name, userId, connectionId, results );
+
+    if (type == TYPE_EXCHANGE ||
+        type == TYPE_TOPIC ||
+        type == TYPE_BINDING)
+        return Manageable::STATUS_NOT_IMPLEMENTED;
+
+    throw UnknownObjectType(type);
+}
+
+Manageable::status_t Broker::queryQueue( const std::string& name,
+                                         const std::string& userId,
+                                         const std::string& /*connectionId*/,
+                                         Variant::Map& results )
+{
+    (void) results;
+    if (acl) {
+        if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_QUEUE, name, NULL) )
+            throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue query request from " << userId));
+    }
+
+    boost::shared_ptr<Queue> q(queues.find(name));
+    if (!q) {
+        QPID_LOG(error, "Query failed: queue not found, name=" << name);
+        return Manageable::STATUS_UNKNOWN_OBJECT;
+    }
+    q->query( results );
+    return Manageable::STATUS_OK;;
+}
+
+Manageable::status_t Broker::getTimestampConfig(bool& receive,
+                                                const ConnectionState* context)
+{
+    std::string name;   // none needed for broker
+    std::string userId = context->getUserId();
+    if (acl && !acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_BROKER, name, NULL))  {
+        throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied broker timestamp get request from " << userId));
+    }
+    receive = config.timestampRcvMsgs;
+    return Manageable::STATUS_OK;
+}
+
+Manageable::status_t Broker::setTimestampConfig(const bool receive,
+                                                const ConnectionState* context)
+{
+    std::string name;   // none needed for broker
+    std::string userId = context->getUserId();
+    if (acl && !acl->authorise(userId, acl::ACT_UPDATE, acl::OBJ_BROKER, name, NULL)) {
+        throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied broker timestamp set request from " << userId));
+    }
+    config.timestampRcvMsgs = receive;
+    QPID_LOG(notice, "Receive message timestamping is " << ((config.timestampRcvMsgs) ? "ENABLED." : "DISABLED."));
+    return Manageable::STATUS_OK;
+}
+
 void Broker::setLogLevel(const std::string& level)
 {
     QPID_LOG(notice, "Changing log level to " << level);
@@ -466,7 +772,7 @@ std::string Broker::getLogLevel()
     const std::vector<std::string>& selectors = qpid::log::Logger::instance().getOptions().selectors;
     for (std::vector<std::string>::const_iterator i = selectors.begin(); i != selectors.end(); ++i) {
         if (i != selectors.begin()) level += std::string(",");
-        level += *i;        
+        level += *i;
     }
     return level;
 }
@@ -499,7 +805,7 @@ void Broker::accept() {
 }
 
 void Broker::connect(
-    const std::string& host, uint16_t port, const std::string& transport,
+    const std::string& host, const std::string& port, const std::string& transport,
     boost::function2<void, int, std::string> failed,
     sys::ConnectionCodec::Factory* f)
 {
@@ -515,13 +821,14 @@ void Broker::connect(
 {
     url.throwIfEmpty();
     const Address& addr=url[0];
-    connect(addr.host, addr.port, addr.protocol, failed, f);
+    connect(addr.host, boost::lexical_cast<std::string>(addr.port), addr.protocol, failed, f);
 }
 
 uint32_t Broker::queueMoveMessages(
      const std::string& srcQueue,
      const std::string& destQueue,
-     uint32_t  qty)
+     uint32_t  qty,
+     const Variant::Map& filter)
 {
   Queue::shared_ptr src_queue = queues.find(srcQueue);
   if (!src_queue)
@@ -530,7 +837,7 @@ uint32_t Broker::queueMoveMessages(
   if (!dest_queue)
     return 0;
 
-  return src_queue->move(dest_queue, qty);
+  return src_queue->move(dest_queue, qty, &filter);
 }
 
 
@@ -548,9 +855,228 @@ bool Broker::deferDeliveryImpl(const std
 
 void Broker::setClusterTimer(std::auto_ptr<sys::Timer> t) {
     clusterTimer = t;
+    queueCleaner.setTimer(clusterTimer.get());
+    dtxManager.setTimer(*clusterTimer.get());
 }
 
 const std::string Broker::TCP_TRANSPORT("tcp");
 
+
+std::pair<boost::shared_ptr<Queue>, bool> Broker::createQueue(
+    const std::string& name,
+    bool durable,
+    bool autodelete,
+    const OwnershipToken* owner,
+    const std::string& alternateExchange,
+    const qpid::framing::FieldTable& arguments,
+    const std::string& userId,
+    const std::string& connectionId)
+{
+    if (acl) {
+        std::map<acl::Property, std::string> params;
+        params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
+        params.insert(make_pair(acl::PROP_PASSIVE, _FALSE));
+        params.insert(make_pair(acl::PROP_DURABLE, durable ? _TRUE : _FALSE));
+        params.insert(make_pair(acl::PROP_EXCLUSIVE, owner ? _TRUE : _FALSE));
+        params.insert(make_pair(acl::PROP_AUTODELETE, autodelete ? _TRUE : _FALSE));
+        params.insert(make_pair(acl::PROP_POLICYTYPE, arguments.getAsString("qpid.policy_type")));
+        params.insert(make_pair(acl::PROP_MAXQUEUECOUNT, boost::lexical_cast<string>(arguments.getAsInt("qpid.max_count"))));
+        params.insert(make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast<string>(arguments.getAsInt64("qpid.max_size"))));
+
+        if (!acl->authorise(userId,acl::ACT_CREATE,acl::OBJ_QUEUE,name,&params) )
+            throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue create request from " << userId));
+    }
+
+    Exchange::shared_ptr alternate;
+    if (!alternateExchange.empty()) {
+        alternate = exchanges.get(alternateExchange);
+        if (!alternate) throw framing::NotFoundException(QPID_MSG("Alternate exchange does not exist: " << alternateExchange));
+    }
+
+    std::pair<Queue::shared_ptr, bool> result = queues.declare(name, durable, autodelete, owner, alternate, arguments);
+    if (result.second) {
+        //add default binding:
+        result.first->bind(exchanges.getDefault(), name);
+
+        if (managementAgent.get()) {
+            //TODO: debatable whether we should raise an event here for
+            //create when this is a 'declare' event; ideally add a create
+            //event instead?
+            managementAgent->raiseEvent(
+                _qmf::EventQueueDeclare(connectionId, userId, name,
+                                        durable, owner, autodelete,
+                                        ManagementAgent::toMap(arguments),
+                                        "created"));
+        }
+    }
+    return result;
+}
+
+void Broker::deleteQueue(const std::string& name, const std::string& userId,
+                         const std::string& connectionId, QueueFunctor check)
+{
+    if (acl && !acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_QUEUE,name,NULL)) {
+        throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue delete request from " << userId));
+    }
+
+    Queue::shared_ptr queue = queues.find(name);
+    if (queue) {
+        if (check) check(queue);
+        queues.destroy(name);
+        queue->destroyed();
+    } else {
+        throw framing::NotFoundException(QPID_MSG("Delete failed. No such queue: " << name));
+    }
+
+    if (managementAgent.get())
+        managementAgent->raiseEvent(_qmf::EventQueueDelete(connectionId, userId, name));
+
+}
+
+std::pair<Exchange::shared_ptr, bool> Broker::createExchange(
+    const std::string& name,
+    const std::string& type,
+    bool durable,
+    const std::string& alternateExchange,
+    const qpid::framing::FieldTable& arguments,
+    const std::string& userId,
+    const std::string& connectionId)
+{
+    if (acl) {
+        std::map<acl::Property, std::string> params;
+        params.insert(make_pair(acl::PROP_TYPE, type));
+        params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
+        params.insert(make_pair(acl::PROP_PASSIVE, _FALSE));
+        params.insert(make_pair(acl::PROP_DURABLE, durable ? _TRUE : _FALSE));
+        if (!acl->authorise(userId,acl::ACT_CREATE,acl::OBJ_EXCHANGE,name,&params) )
+            throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange create request from " << userId));
+    }
+
+    Exchange::shared_ptr alternate;
+    if (!alternateExchange.empty()) {
+        alternate = exchanges.get(alternateExchange);
+        if (!alternate) throw framing::NotFoundException(QPID_MSG("Alternate exchange does not exist: " << alternateExchange));
+    }
+
+    std::pair<Exchange::shared_ptr, bool> result;
+    result = exchanges.declare(name, type, durable, arguments);
+    if (result.second) {
+        if (alternate) {
+            result.first->setAlternate(alternate);
+            alternate->incAlternateUsers();
+        }
+        if (durable) {
+            store->create(*result.first, arguments);
+        }
+        if (managementAgent.get()) {
+            //TODO: debatable whether we should raise an event here for
+            //create when this is a 'declare' event; ideally add a create
+            //event instead?
+            managementAgent->raiseEvent(_qmf::EventExchangeDeclare(connectionId,
+                                                         userId,
+                                                         name,
+                                                         type,
+                                                         alternateExchange,
+                                                         durable,
+                                                         false,
+                                                         ManagementAgent::toMap(arguments),
+                                                         "created"));
+        }
+    }
+    return result;
+}
+
+void Broker::deleteExchange(const std::string& name, const std::string& userId,
+                           const std::string& connectionId)
+{
+    if (acl) {
+        if (!acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_EXCHANGE,name,NULL) )
+            throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange delete request from " << userId));
+    }
+
+    if (name.empty()) {
+        throw framing::InvalidArgumentException(QPID_MSG("Delete not allowed for default exchange"));
+    }
+    Exchange::shared_ptr exchange(exchanges.get(name));
+    if (!exchange) throw framing::NotFoundException(QPID_MSG("Delete failed. No such exchange: " << name));
+    if (exchange->inUseAsAlternate()) throw framing::NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange."));
+    if (exchange->isDurable()) store->destroy(*exchange);
+    if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers();
+    exchanges.destroy(name);
+
+    if (managementAgent.get())
+        managementAgent->raiseEvent(_qmf::EventExchangeDelete(connectionId, userId, name));
+
+}
+
+void Broker::bind(const std::string& queueName,
+                  const std::string& exchangeName,
+                  const std::string& key,
+                  const qpid::framing::FieldTable& arguments,
+                  const std::string& userId,
+                  const std::string& connectionId)
+{
+    if (acl) {
+        std::map<acl::Property, std::string> params;
+        params.insert(make_pair(acl::PROP_QUEUENAME, queueName));
+        params.insert(make_pair(acl::PROP_ROUTINGKEY, key));
+
+        if (!acl->authorise(userId,acl::ACT_BIND,acl::OBJ_EXCHANGE,exchangeName,&params))
+            throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange bind request from " << userId));
+    }
+    if (exchangeName.empty()) {
+        throw framing::InvalidArgumentException(QPID_MSG("Bind not allowed for default exchange"));
+    }
+
+    Queue::shared_ptr queue = queues.find(queueName);
+    Exchange::shared_ptr exchange = exchanges.get(exchangeName);
+    if (!queue) {
+        throw framing::NotFoundException(QPID_MSG("Bind failed. No such queue: " << queueName));
+    } else if (!exchange) {
+        throw framing::NotFoundException(QPID_MSG("Bind failed. No such exchange: " << exchangeName));
+    } else {
+        if (queue->bind(exchange, key, arguments)) {
+            if (managementAgent.get()) {
+                managementAgent->raiseEvent(_qmf::EventBind(connectionId, userId, exchangeName,
+                                                  queueName, key, ManagementAgent::toMap(arguments)));
+            }
+        }
+    }
+}
+
+void Broker::unbind(const std::string& queueName,
+                    const std::string& exchangeName,
+                    const std::string& key,
+                    const std::string& userId,
+                    const std::string& connectionId)
+{
+    if (acl) {
+        std::map<acl::Property, std::string> params;
+        params.insert(make_pair(acl::PROP_QUEUENAME, queueName));
+        params.insert(make_pair(acl::PROP_ROUTINGKEY, key));
+        if (!acl->authorise(userId,acl::ACT_UNBIND,acl::OBJ_EXCHANGE,exchangeName,&params) )
+            throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange unbind request from " << userId));
+    }
+    if (exchangeName.empty()) {
+        throw framing::InvalidArgumentException(QPID_MSG("Unbind not allowed for default exchange"));
+    }
+    Queue::shared_ptr queue = queues.find(queueName);
+    Exchange::shared_ptr exchange = exchanges.get(exchangeName);
+    if (!queue) {
+        throw framing::NotFoundException(QPID_MSG("Unbind failed. No such queue: " << queueName));
+    } else if (!exchange) {
+        throw framing::NotFoundException(QPID_MSG("Unbind failed. No such exchange: " << exchangeName));
+    } else {
+        if (exchange->unbind(queue, key, 0)) {
+            if (exchange->isDurable() && queue->isDurable()) {
+                store->unbind(*exchange, *queue, key, qpid::framing::FieldTable());
+            }
+            if (managementAgent.get()) {
+                managementAgent->raiseEvent(_qmf::EventUnbind(connectionId, userId, exchangeName, queueName, key));
+            }
+        }
+    }
+}
+
 }} // namespace qpid::broker
 

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/Broker.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/Broker.h Fri Oct 21 14:42:12 2011
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -49,6 +49,7 @@
 #include "qpid/framing/ProtocolInitiation.h"
 #include "qpid/sys/Runnable.h"
 #include "qpid/sys/Timer.h"
+#include "qpid/types/Variant.h"
 #include "qpid/RefCounted.h"
 #include "qpid/broker/AclModule.h"
 #include "qpid/sys/Mutex.h"
@@ -57,7 +58,7 @@
 #include <string>
 #include <vector>
 
-namespace qpid { 
+namespace qpid {
 
 namespace sys {
     class ProtocolFactory;
@@ -68,6 +69,7 @@ struct Url;
 
 namespace broker {
 
+class ConnectionState;
 class ExpiryPolicy;
 class Message;
 
@@ -80,7 +82,7 @@ struct NoSuchTransportException : qpid::
 };
 
 /**
- * A broker instance. 
+ * A broker instance.
  */
 class Broker : public sys::Runnable, public Plugin::Target,
                public management::Manageable,
@@ -116,29 +118,34 @@ public:
         bool asyncQueueEvents;
         bool qmf2Support;
         bool qmf1Support;
+        uint queueFlowStopRatio;    // producer flow control: on
+        uint queueFlowResumeRatio;  // producer flow control: off
+        uint16_t queueThresholdEventRatio;
+        std::string defaultMsgGroup;
+        bool timestampRcvMsgs;
 
       private:
         std::string getHome();
     };
-    
+
     class ConnectionCounter {
             int maxConnections;
             int connectionCount;
             sys::Mutex connectionCountLock;
         public:
             ConnectionCounter(int mc): maxConnections(mc),connectionCount(0) {};
-            void inc_connectionCount() {    
-                sys::ScopedLock<sys::Mutex> l(connectionCountLock); 
+            void inc_connectionCount() {
+                sys::ScopedLock<sys::Mutex> l(connectionCountLock);
                 connectionCount++;
-            } 
-            void dec_connectionCount() {    
-                sys::ScopedLock<sys::Mutex> l(connectionCountLock); 
+            }
+            void dec_connectionCount() {
+                sys::ScopedLock<sys::Mutex> l(connectionCountLock);
                 connectionCount--;
             }
             bool allowConnection() {
-                sys::ScopedLock<sys::Mutex> l(connectionCountLock); 
+                sys::ScopedLock<sys::Mutex> l(connectionCountLock);
                 return (maxConnections <= connectionCount);
-            } 
+            }
     };
 
   private:
@@ -148,7 +155,20 @@ public:
     void setStore ();
     void setLogLevel(const std::string& level);
     std::string getLogLevel();
-
+    void createObject(const std::string& type, const std::string& name,
+                      const qpid::types::Variant::Map& properties, bool strict, const ConnectionState* context);
+    void deleteObject(const std::string& type, const std::string& name,
+                      const qpid::types::Variant::Map& options, const ConnectionState* context);
+    Manageable::status_t queryObject(const std::string& type, const std::string& name,
+                                     qpid::types::Variant::Map& results, const ConnectionState* context);
+    Manageable::status_t queryQueue( const std::string& name,
+                                     const std::string& userId,
+                                     const std::string& connectionId,
+                                     qpid::types::Variant::Map& results);
+    Manageable::status_t getTimestampConfig(bool& receive,
+                                            const ConnectionState* context);
+    Manageable::status_t setTimestampConfig(const bool receive,
+                                            const ConnectionState* context);
     boost::shared_ptr<sys::Poller> poller;
     sys::Timer timer;
     std::auto_ptr<sys::Timer> clusterTimer;
@@ -176,10 +196,10 @@ public:
                            const boost::intrusive_ptr<Message>& msg);
     std::string federationTag;
     bool recovery;
-    bool clusterUpdatee;
+    bool inCluster, clusterUpdatee;
     boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
     ConnectionCounter connectionCounter;
-    
+
   public:
     virtual ~Broker();
 
@@ -235,7 +255,7 @@ public:
     QPID_BROKER_EXTERN void accept();
 
     /** Create a connection to another broker. */
-    void connect(const std::string& host, uint16_t port, 
+    void connect(const std::string& host, const std::string& port,
                  const std::string& transport,
                  boost::function2<void, int, std::string> failed,
                  sys::ConnectionCodec::Factory* =0);
@@ -247,9 +267,10 @@ public:
     /** Move messages from one queue to another.
         A zero quantity means to move all messages
     */
-    uint32_t queueMoveMessages( const std::string& srcQueue, 
+    uint32_t queueMoveMessages( const std::string& srcQueue,
 			    const std::string& destQueue,
-			    uint32_t  qty); 
+                uint32_t  qty,
+                const qpid::types::Variant::Map& filter);
 
     boost::shared_ptr<sys::ProtocolFactory> getProtocolFactory(const std::string& name = TCP_TRANSPORT) const;
 
@@ -273,11 +294,20 @@ public:
     void setRecovery(bool set) { recovery = set; }
     bool getRecovery() const { return recovery; }
 
-    void setClusterUpdatee(bool set) { clusterUpdatee = set; }
+    /** True of this broker is part of a cluster.
+     * Only valid after early initialization of plugins is complete.
+     */
+    bool isInCluster() const { return inCluster; }
+    void setInCluster(bool set) { inCluster = set; }
+
+    /** True if this broker is joining a cluster and in the process of
+     * receiving a state update.
+     */
     bool isClusterUpdatee() const { return clusterUpdatee; }
+    void setClusterUpdatee(bool set) { clusterUpdatee = set; }
 
     management::ManagementAgent* getManagementAgent() { return managementAgent.get(); }
-    
+
     ConnectionCounter& getConnectionCounter() {return connectionCounter;}
 
     /**
@@ -290,6 +320,43 @@ public:
                           const boost::intrusive_ptr<Message>& msg)> deferDelivery;
 
     bool isAuthenticating ( ) { return config.auth; }
+    bool isTimestamping() { return config.timestampRcvMsgs; }
+
+    typedef boost::function1<void, boost::shared_ptr<Queue> > QueueFunctor;
+
+    std::pair<boost::shared_ptr<Queue>, bool> createQueue(
+        const std::string& name,
+        bool durable,
+        bool autodelete,
+        const OwnershipToken* owner,
+        const std::string& alternateExchange,
+        const qpid::framing::FieldTable& arguments,
+        const std::string& userId,
+        const std::string& connectionId);
+    void deleteQueue(const std::string& name,
+                     const std::string& userId,
+                     const std::string& connectionId,
+                     QueueFunctor check = QueueFunctor());
+    std::pair<Exchange::shared_ptr, bool> createExchange(
+        const std::string& name,
+        const std::string& type,
+        bool durable,
+        const std::string& alternateExchange,
+        const qpid::framing::FieldTable& args,
+        const std::string& userId, const std::string& connectionId);
+    void deleteExchange(const std::string& name, const std::string& userId,
+                        const std::string& connectionId);
+    void bind(const std::string& queue,
+              const std::string& exchange,
+              const std::string& key,
+              const qpid::framing::FieldTable& arguments,
+              const std::string& userId,
+              const std::string& connectionId);
+    void unbind(const std::string& queue,
+                const std::string& exchange,
+                const std::string& key,
+                const std::string& userId,
+                const std::string& connectionId);
 };
 
 }}

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/BrokerImportExport.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/BrokerImportExport.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/BrokerImportExport.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/BrokerImportExport.h Fri Oct 21 14:42:12 2011
@@ -20,14 +20,23 @@
  * under the License.
  */
 
-#if defined(WIN32) && !defined(QPID_BROKER_STATIC)
-#if defined(BROKER_EXPORT) || defined (qpidbroker_EXPORTS)
-#define QPID_BROKER_EXTERN __declspec(dllexport)
+#if defined(WIN32) && !defined(QPID_DECLARE_STATIC)
+#  if defined(BROKER_EXPORT) || defined (qpidbroker_EXPORTS)
+#    define QPID_BROKER_EXTERN __declspec(dllexport)
+#  else
+#    define QPID_BROKER_EXTERN __declspec(dllimport)
+#  endif
+#  ifdef _MSC_VER
+#    define QPID_BROKER_CLASS_EXTERN
+#    define QPID_BROKER_INLINE_EXTERN QPID_BROKER_EXTERN
+#  else
+#    define QPID_BROKER_CLASS_EXTERN QPID_BROKER_EXTERN
+#    define QPID_BROKER_INLINE_EXTERN
+#  endif
 #else
-#define QPID_BROKER_EXTERN __declspec(dllimport)
-#endif
-#else
-#define QPID_BROKER_EXTERN 
+#  define QPID_BROKER_EXTERN
+#  define QPID_BROKER_CLASS_EXTERN
+#  define QPID_BROKER_INLINE_EXTERN
 #endif
 
 #endif

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/Connection.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/Connection.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/Connection.cpp Fri Oct 21 14:42:12 2011
@@ -156,16 +156,7 @@ Connection::~Connection()
 void Connection::received(framing::AMQFrame& frame) {
     // Received frame on connection so delay timeout
     restartTimeout();
-
-    if (frame.getChannel() == 0 && frame.getMethod()) {
-        adapter.handle(frame);
-    } else {
-        if (adapter.isOpen())
-            getChannel(frame.getChannel()).in(frame);
-        else
-            close(connection::CLOSE_CODE_FRAMING_ERROR, "Connection not yet open, invalid frame received.");
-    }
-
+    adapter.handle(frame);
     if (isLink) //i.e. we are acting as the client to another broker
         recordFromServer(frame);
     else
@@ -278,8 +269,7 @@ void Connection::setUserId(const string&
     ConnectionState::setUserId(userId);
     // In a cluster, the cluster code will raise the connect event
     // when the connection is replicated to the cluster.
-    if (!sys::isCluster())
-        raiseConnectEvent();
+    if (!broker.isInCluster()) raiseConnectEvent();
 }
 
 void Connection::raiseConnectEvent() {
@@ -289,11 +279,11 @@ void Connection::raiseConnectEvent() {
     }
 }
 
-void Connection::setFederationLink(bool b)
+void Connection::setUserProxyAuth(bool b)
 {
-    ConnectionState::setFederationLink(b);
+    ConnectionState::setUserProxyAuth(b);
     if (mgmtObject != 0)
-            mgmtObject->set_federationLink(b);
+        mgmtObject->set_userProxyAuth(b);
 }
 
 void Connection::close(connection::CloseCode code, const string& text)
@@ -332,31 +322,30 @@ void Connection::closed(){ // Physically
     try {
         while (!channels.empty())
             ptr_map_ptr(channels.begin())->handleDetach();
-        while (!exclusiveQueues.empty()) {
-            boost::shared_ptr<Queue> q(exclusiveQueues.front());
-            q->releaseExclusiveOwnership();
-            if (q->canAutoDelete()) {
-                Queue::tryAutoDelete(broker, q);
-            }
-            exclusiveQueues.erase(exclusiveQueues.begin());
-        }
     } catch(std::exception& e) {
         QPID_LOG(error, QPID_MSG("While closing connection: " << e.what()));
         assert(0);
     }
 }
 
+void Connection::doIoCallbacks() {
+    {
+        ScopedLock<Mutex> l(ioCallbackLock);
+        // Although IO callbacks execute in the connection thread context, they are
+        // not cluster safe because they are queued for execution in non-IO threads.
+        ClusterUnsafeScope cus;
+        while (!ioCallbacks.empty()) {
+            boost::function0<void> cb = ioCallbacks.front();
+            ioCallbacks.pop();
+            ScopedUnlock<Mutex> ul(ioCallbackLock);
+            cb(); // Lend the IO thread for management processing
+        }
+    }
+}
+
 bool Connection::doOutput() {
     try {
-        {
-            ScopedLock<Mutex> l(ioCallbackLock);
-            while (!ioCallbacks.empty()) {
-                boost::function0<void> cb = ioCallbacks.front();
-                ioCallbacks.pop();
-                ScopedUnlock<Mutex> ul(ioCallbackLock);
-                cb(); // Lend the IO thread for management processing
-            }
-        }
+        doIoCallbacks();
         if (mgmtClosing) {
             closed();
             close(connection::CLOSE_CODE_CONNECTION_FORCED, "Closed by Management Request");
@@ -476,8 +465,8 @@ void Connection::OutboundFrameTracker::a
 void Connection::OutboundFrameTracker::activateOutput() { next->activateOutput(); }
 void Connection::OutboundFrameTracker::giveReadCredit(int32_t credit) { next->giveReadCredit(credit); }
 void Connection::OutboundFrameTracker::send(framing::AMQFrame& f)
-{ 
-    next->send(f); 
+{
+    next->send(f);
     con.sent(f);
 }
 void Connection::OutboundFrameTracker::wrap(sys::ConnectionOutputHandlerPtr& p)

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/Connection.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/Connection.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/Connection.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/Connection.h Fri Oct 21 14:42:12 2011
@@ -125,7 +125,7 @@ class Connection : public sys::Connectio
     const std::string& getUserId() const { return ConnectionState::getUserId(); }
     const std::string& getMgmtId() const { return mgmtId; }
     management::ManagementAgent* getAgent() const { return agent; }
-    void setFederationLink(bool b);
+    void setUserProxyAuth(bool b);
     /** Connection does not delete the listener. 0 resets. */
     void setErrorListener(ErrorListener* l) { errorListener=l; }
     ErrorListener* getErrorListener() { return errorListener; }
@@ -153,13 +153,16 @@ class Connection : public sys::Connectio
     void addManagementObject();
 
     const qpid::sys::SecuritySettings& getExternalSecuritySettings() const
-    { 
+    {
         return securitySettings;
     }
 
     /** @return true if the initial connection negotiation is complete. */
     bool isOpen();
 
+    // Used by cluster during catch-up, see cluster::OutputInterceptor
+    void doIoCallbacks();
+
   private:
     typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap;
     typedef std::vector<boost::shared_ptr<Queue> >::iterator queue_iterator;
@@ -201,7 +204,7 @@ class Connection : public sys::Connectio
         sys::ConnectionOutputHandler* next;
     };
     OutboundFrameTracker outboundTracker;
-    
+
 
     void sent(const framing::AMQFrame& f);
   public:

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/ConnectionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/ConnectionHandler.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/ConnectionHandler.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/ConnectionHandler.cpp Fri Oct 21 14:42:12 2011
@@ -26,6 +26,7 @@
 #include "qpid/broker/SecureConnection.h"
 #include "qpid/Url.h"
 #include "qpid/framing/AllInvoker.h"
+#include "qpid/framing/ConnectionStartOkBody.h"
 #include "qpid/framing/enum.h"
 #include "qpid/log/Statement.h"
 #include "qpid/sys/SecurityLayer.h"
@@ -63,13 +64,31 @@ void ConnectionHandler::heartbeat()
     handler->proxy.heartbeat();
 }
 
+bool ConnectionHandler::handle(const framing::AMQMethodBody& method)
+{
+    //Need special handling for start-ok, in order to distinguish
+    //between null and empty response
+    if (method.isA<ConnectionStartOkBody>()) {
+        handler->startOk(dynamic_cast<const ConnectionStartOkBody&>(method));
+        return true;
+    } else {
+        return invoke(static_cast<AMQP_AllOperations::ConnectionHandler&>(*handler), method);
+    }
+}
+
 void ConnectionHandler::handle(framing::AMQFrame& frame)
 {
     AMQMethodBody* method=frame.getBody()->getMethod();
     Connection::ErrorListener* errorListener = handler->connection.getErrorListener();
     try{
-        if (!invoke(static_cast<AMQP_AllOperations::ConnectionHandler&>(*handler.get()), *method)) {
+        if (method && handle(*method)) {
+            // This is a connection control frame, nothing more to do.
+        } else if (isOpen()) {
             handler->connection.getChannel(frame.getChannel()).in(frame);
+        } else {
+            handler->proxy.close(
+                connection::CLOSE_CODE_FRAMING_ERROR,
+                "Connection not yet open, invalid frame received.");
         }
     }catch(ConnectionException& e){
         if (errorListener) errorListener->connectionError(e.what());
@@ -89,13 +108,10 @@ ConnectionHandler::ConnectionHandler(Con
 
 ConnectionHandler::Handler::Handler(Connection& c, bool isClient, bool isShadow) :
     proxy(c.getOutput()),
-    connection(c), serverMode(!isClient), acl(0), secured(0),
+    connection(c), serverMode(!isClient), secured(0),
     isOpen(false)
 {
     if (serverMode) {
-
-    	acl =  connection.getBroker().getAcl();
-
         FieldTable properties;
         Array mechanisms(0x95);
 
@@ -118,13 +134,20 @@ ConnectionHandler::Handler::Handler(Conn
 ConnectionHandler::Handler::~Handler() {}
 
 
-void ConnectionHandler::Handler::startOk(const framing::FieldTable& clientProperties,
-                                         const string& mechanism,
-                                         const string& response,
+void ConnectionHandler::Handler::startOk(const framing::FieldTable& /*clientProperties*/,
+                                         const string& /*mechanism*/,
+                                         const string& /*response*/,
                                          const string& /*locale*/)
 {
+    //Need special handling for start-ok, in order to distinguish
+    //between null and empty response -> should never use this method
+    assert(false);
+}
+
+void ConnectionHandler::Handler::startOk(const ConnectionStartOkBody& body)
+{
     try {
-        authenticator->start(mechanism, response);
+        authenticator->start(body.getMechanism(), body.hasResponse() ? &body.getResponse() : 0);
     } catch (std::exception& /*e*/) {
         management::ManagementAgent* agent = connection.getAgent();
         if (agent) {
@@ -136,9 +159,14 @@ void ConnectionHandler::Handler::startOk
         }
         throw;
     }
+    const framing::FieldTable& clientProperties = body.getClientProperties();
     connection.setFederationLink(clientProperties.get(QPID_FED_LINK));
-    connection.setFederationPeerTag(clientProperties.getAsString(QPID_FED_TAG));
+    if (clientProperties.isSet(QPID_FED_TAG)) {
+        connection.setFederationPeerTag(clientProperties.getAsString(QPID_FED_TAG));
+    }
     if (connection.isFederationLink()) {
+        AclModule* acl =  connection.getBroker().getAcl();
+        FieldTable properties;
     	if (acl && !acl->authorise(connection.getUserId(),acl::ACT_CREATE,acl::OBJ_LINK,"")){
             proxy.close(framing::connection::CLOSE_CODE_CONNECTION_FORCED,"ACL denied creating a federation link");
             return;
@@ -183,7 +211,7 @@ void ConnectionHandler::Handler::secureO
 void ConnectionHandler::Handler::tuneOk(uint16_t /*channelmax*/,
     uint16_t framemax, uint16_t heartbeat)
 {
-    connection.setFrameMax(framemax);
+    if (framemax) connection.setFrameMax(framemax);
     connection.setHeartbeatInterval(heartbeat);
 }
 
@@ -256,7 +284,6 @@ void ConnectionHandler::Handler::start(c
                                                   false ); // disallow interaction
     }
     std::string supportedMechanismsList;
-    bool requestedMechanismIsSupported = false;
     Array::const_iterator i;
 
     /*
@@ -269,11 +296,9 @@ void ConnectionHandler::Handler::start(c
             if (i != supportedMechanisms.begin())
                 supportedMechanismsList += SPACE;
             supportedMechanismsList += (*i)->get<std::string>();
-            requestedMechanismIsSupported = true;
         }
     }
     else {
-        requestedMechanismIsSupported = false;
         /*
           The caller has requested a mechanism.  If it's available,
           make sure it ends up at the head of the list.
@@ -282,7 +307,6 @@ void ConnectionHandler::Handler::start(c
             string currentMechanism = (*i)->get<std::string>();
 
             if ( requestedMechanism == currentMechanism ) {
-                requestedMechanismIsSupported = true;
                 supportedMechanismsList = currentMechanism + SPACE + supportedMechanismsList;
             } else {
                 if (i != supportedMechanisms.begin())
@@ -292,7 +316,9 @@ void ConnectionHandler::Handler::start(c
         }
     }
 
-    connection.setFederationPeerTag(serverProperties.getAsString(QPID_FED_TAG));
+    if (serverProperties.isSet(QPID_FED_TAG)) {
+        connection.setFederationPeerTag(serverProperties.getAsString(QPID_FED_TAG));
+    }
 
     FieldTable ft;
     ft.setInt(QPID_FED_LINK,1);
@@ -301,11 +327,21 @@ void ConnectionHandler::Handler::start(c
     string response;
     if (sasl.get()) {
         const qpid::sys::SecuritySettings& ss = connection.getExternalSecuritySettings();
-        response = sasl->start ( requestedMechanism.empty()
-                                 ? supportedMechanismsList
-                                 : requestedMechanism,
-                                 & ss );
-        proxy.startOk ( ft, sasl->getMechanism(), response, en_US );
+        if (sasl->start ( requestedMechanism.empty()
+                          ? supportedMechanismsList
+                          : requestedMechanism,
+                          response,
+                          & ss )) {
+            proxy.startOk ( ft, sasl->getMechanism(), response, en_US );
+        } else {
+            //response was null
+            ConnectionStartOkBody body;
+            body.setClientProperties(ft);
+            body.setMechanism(sasl->getMechanism());
+            //Don't set response, as none was given
+            body.setLocale(en_US);
+            proxy.send(body);
+        }
     }
     else {
         response = ((char)0) + username + ((char)0) + password;

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/ConnectionHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/ConnectionHandler.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/ConnectionHandler.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/ConnectionHandler.h Fri Oct 21 14:42:12 2011
@@ -26,8 +26,10 @@
 #include "qpid/broker/SaslAuthenticator.h"
 #include "qpid/framing/amqp_types.h"
 #include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/AMQMethodBody.h"
 #include "qpid/framing/AMQP_AllOperations.h"
 #include "qpid/framing/AMQP_AllProxy.h"
+#include "qpid/framing/ConnectionStartOkBody.h"
 #include "qpid/framing/enum.h"
 #include "qpid/framing/FrameHandler.h"
 #include "qpid/framing/ProtocolInitiation.h"
@@ -57,12 +59,12 @@ class ConnectionHandler : public framing
         Connection& connection;
         bool serverMode;
         std::auto_ptr<SaslAuthenticator> authenticator;
-        AclModule* acl;
         SecureConnection* secured;
         bool isOpen;
 
         Handler(Connection& connection, bool isClient, bool isShadow=false);
         ~Handler();
+        void startOk(const qpid::framing::ConnectionStartOkBody& body);
         void startOk(const qpid::framing::FieldTable& clientProperties,
                      const std::string& mechanism, const std::string& response,
                      const std::string& locale);
@@ -96,7 +98,7 @@ class ConnectionHandler : public framing
     };
     std::auto_ptr<Handler> handler;
 
-
+    bool handle(const qpid::framing::AMQMethodBody& method);
   public:
     ConnectionHandler(Connection& connection, bool isClient, bool isShadow=false );
     void close(framing::connection::CloseCode code, const std::string& text);

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/ConnectionState.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/ConnectionState.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/ConnectionState.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/ConnectionState.h Fri Oct 21 14:42:12 2011
@@ -46,6 +46,7 @@ class ConnectionState : public Connectio
         framemax(65535),
         heartbeat(0),
         heartbeatmax(120),
+        userProxyAuth(false), // Can proxy msgs with non-matching auth ids when true (used by federation links & clustering)
         federationLink(true),
         clientSupportsThrottling(false),
         clusterOrderOut(0)
@@ -67,8 +68,10 @@ class ConnectionState : public Connectio
     void setUrl(const std::string& _url) { url = _url; }
     const std::string& getUrl() const { return url; }
 
-    void setFederationLink(bool b) {  federationLink = b; }
-    bool isFederationLink() const { return federationLink; }
+    void setUserProxyAuth(const bool b) { userProxyAuth = b; }
+    bool isUserProxyAuth() const { return userProxyAuth || federationPeerTag.size() > 0; } // links can proxy msgs with non-matching auth ids
+    void setFederationLink(bool b) { federationLink = b; } // deprecated - use setFederationPeerTag() instead
+    bool isFederationLink() const { return federationPeerTag.size() > 0; }
     void setFederationPeerTag(const std::string& tag) { federationPeerTag = std::string(tag); }
     const std::string& getFederationPeerTag() const { return federationPeerTag; }
     std::vector<Url>& getKnownHosts() { return knownHosts; }
@@ -79,7 +82,6 @@ class ConnectionState : public Connectio
     Broker& getBroker() { return broker; }
 
     Broker& broker;
-    std::vector<boost::shared_ptr<Queue> > exclusiveQueues;
 
     //contained output tasks
     sys::AggregateOutput outputTasks;
@@ -106,6 +108,7 @@ class ConnectionState : public Connectio
     uint16_t heartbeatmax;
     std::string userId;
     std::string url;
+    bool userProxyAuth;
     bool federationLink;
     std::string federationPeerTag;
     std::vector<Url> knownHosts;

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/Consumer.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/Consumer.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/Consumer.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/Consumer.h Fri Oct 21 14:42:12 2011
@@ -29,22 +29,33 @@ namespace qpid {
 namespace broker {
 
 class Queue;
+class QueueListeners;
 
 class Consumer {
     const bool acquires;
-  public:
-    typedef boost::shared_ptr<Consumer> shared_ptr;            
-    
+    // inListeners allows QueueListeners to efficiently track if this instance is registered
+    // for notifications without having to search its containers
+    bool inListeners;
+    // the name is generated by broker and is unique within broker scope.  It is not
+    // provided or known by the remote Consumer.
+    const std::string name;
+ public:
+    typedef boost::shared_ptr<Consumer> shared_ptr;
+
     framing::SequenceNumber position;
-    
-    Consumer(bool preAcquires = true) : acquires(preAcquires) {}
+
+    Consumer(const std::string& _name, bool preAcquires = true)
+      : acquires(preAcquires), inListeners(false), name(_name), position(0) {}
     bool preAcquires() const { return acquires; }
+    const std::string& getName() const { return name; }
+
     virtual bool deliver(QueuedMessage& msg) = 0;
     virtual void notify() = 0;
     virtual bool filter(boost::intrusive_ptr<Message>) { return true; }
     virtual bool accept(boost::intrusive_ptr<Message>) { return true; }
     virtual OwnershipToken* getSession() = 0;
     virtual ~Consumer(){}
+    friend class QueueListeners;
 };
 
 }}

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/Daemon.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/Daemon.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/Daemon.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/Daemon.cpp Fri Oct 21 14:42:12 2011
@@ -93,11 +93,10 @@ void Daemon::fork()
         catch (const exception& e) {
             QPID_LOG(critical, "Unexpected error: " << e.what());
             uint16_t port = 0;
-            int unused_ret;    //Supress warning about ignoring return value.
-            unused_ret = write(pipeFds[1], &port, sizeof(uint16_t));
+            (void) write(pipeFds[1], &port, sizeof(uint16_t));
 
             std::string pipeFailureMessage = e.what();
-            unused_ret = write ( pipeFds[1], 
+            (void) write ( pipeFds[1], 
                     pipeFailureMessage.c_str(), 
                     strlen(pipeFailureMessage.c_str())
                   );

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/DeliverableMessage.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/DeliverableMessage.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/DeliverableMessage.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/DeliverableMessage.h Fri Oct 21 14:42:12 2011
@@ -29,7 +29,7 @@
 
 namespace qpid {
     namespace broker {
-        class DeliverableMessage : public Deliverable{
+        class QPID_BROKER_CLASS_EXTERN DeliverableMessage : public Deliverable{
             boost::intrusive_ptr<Message> msg;
         public:
             QPID_BROKER_EXTERN DeliverableMessage(const boost::intrusive_ptr<Message>& msg);

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/DeliveryRecord.cpp Fri Oct 21 14:42:12 2011
@@ -75,7 +75,7 @@ void DeliveryRecord::deliver(framing::Fr
 {
     id = deliveryId;
     if (msg.payload->getRedelivered()){
-        msg.payload->getProperties<framing::DeliveryProperties>()->setRedelivered(true);
+        msg.payload->setRedelivered();
     }
     msg.payload->adjustTtl();
 
@@ -131,18 +131,20 @@ void DeliveryRecord::committed() const{
 
 void DeliveryRecord::reject() 
 {    
-    Exchange::shared_ptr alternate = queue->getAlternateExchange();
-    if (alternate) {
-        DeliverableMessage delivery(msg.payload);
-        alternate->route(delivery, msg.payload->getRoutingKey(), msg.payload->getApplicationHeaders());
-        QPID_LOG(info, "Routed rejected message from " << queue->getName() << " to " 
-                 << alternate->getName());
-    } else {
-        //just drop it
-        QPID_LOG(info, "Dropping rejected message from " << queue->getName());
+    if (acquired && !ended) {
+        Exchange::shared_ptr alternate = queue->getAlternateExchange();
+        if (alternate) {
+            DeliverableMessage delivery(msg.payload);
+            alternate->routeWithAlternate(delivery);
+            QPID_LOG(info, "Routed rejected message from " << queue->getName() << " to "
+                     << alternate->getName());
+        } else {
+            //just drop it
+            QPID_LOG(info, "Dropping rejected message from " << queue->getName());
+        }
+        dequeue();
+        setEnded();
     }
-
-    dequeue();
 }
 
 uint32_t DeliveryRecord::getCredit() const
@@ -151,7 +153,7 @@ uint32_t DeliveryRecord::getCredit() con
 }
 
 void DeliveryRecord::acquire(DeliveryIds& results) {
-    if (queue->acquire(msg)) {
+    if (queue->acquire(msg, tag)) {
         acquired = true;
         results.push_back(id);
         if (!acceptExpected) {

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/DeliveryRecord.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/DeliveryRecord.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/DeliveryRecord.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/DeliveryRecord.h Fri Oct 21 14:42:12 2011
@@ -46,7 +46,7 @@ class DeliveryRecord
 {
     QueuedMessage msg;
     mutable boost::shared_ptr<Queue> queue;
-    std::string tag;
+    std::string tag;    // name of consumer
     DeliveryId id;
     bool acquired : 1;
     bool acceptExpected : 1;
@@ -90,7 +90,7 @@ class DeliveryRecord
 
     bool isAcquired() const { return acquired; }
     bool isComplete() const { return completed; }
-    bool isRedundant() const { return ended && (!windowing || completed); }
+    bool isRedundant() const { return ended && (!windowing || completed || cancelled); }
     bool isCancelled() const { return cancelled; }
     bool isAccepted() const { return !acceptExpected; }
     bool isEnded() const { return ended; }

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/DirectExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/DirectExchange.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/DirectExchange.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/DirectExchange.cpp Fri Oct 21 14:42:12 2011
@@ -94,7 +94,7 @@ bool DirectExchange::bind(Queue::shared_
 
         propagate = bk.fedBinding.delOrigin(queue->getName(), fedOrigin);
         if (bk.fedBinding.countFedBindings(queue->getName()) == 0)
-            unbind(queue, routingKey, 0);
+            unbind(queue, routingKey, args);
 
     } else if (fedOp == fedOpReorigin) {
         /** gather up all the keys that need rebinding in a local vector
@@ -124,20 +124,24 @@ bool DirectExchange::bind(Queue::shared_
     return true;
 }
 
-bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/)
+bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args)
 {
+    string fedOrigin(args ? args->getAsString(qpidFedOrigin) : "");
     bool propagate = false;
 
-    QPID_LOG(debug, "Unbind key [" << routingKey << "] from queue " << queue->getName());
-
+    QPID_LOG(debug, "Unbinding key [" << routingKey << "] from queue " << queue->getName()
+             << " on exchange " << getName() << " origin=" << fedOrigin << ")" );
     {
         Mutex::ScopedLock l(lock);
         BoundKey& bk = bindings[routingKey];
         if (bk.queues.remove_if(MatchQueue(queue))) {
-            propagate = bk.fedBinding.delOrigin();
+            propagate = bk.fedBinding.delOrigin(queue->getName(), fedOrigin);
             if (mgmtExchange != 0) {
                 mgmtExchange->dec_bindingCount();
             }
+            if (bk.queues.empty()) {
+                bindings.erase(routingKey);
+            }
         } else {
             return false;
         }

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxAck.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxAck.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxAck.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxAck.cpp Fri Oct 21 14:42:12 2011
@@ -32,6 +32,10 @@ DtxAck::DtxAck(const qpid::framing::Sequ
                    not1(bind2nd(mem_fun_ref(&DeliveryRecord::coveredBy), &acked)));
 }
 
+DtxAck::DtxAck(DeliveryRecords& unacked) {
+    pending = unacked;
+}
+
 bool DtxAck::prepare(TransactionContext* ctxt) throw()
 {
     try{

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxAck.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxAck.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxAck.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxAck.h Fri Oct 21 14:42:12 2011
@@ -1,3 +1,6 @@
+#ifndef QPID_BROKER_DTXACK_H
+#define QPID_BROKER_DTXACK_H
+
 /*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -18,9 +21,6 @@
  * under the License.
  *
  */
-#ifndef _DtxAck_
-#define _DtxAck_
-
 #include <algorithm>
 #include <functional>
 #include <list>
@@ -29,20 +29,21 @@
 #include "qpid/broker/TxOp.h"
 
 namespace qpid {
-    namespace broker {
-        class DtxAck : public TxOp{
-            DeliveryRecords pending;
+namespace broker {
+class DtxAck : public TxOp{
+    DeliveryRecords pending;
 
-        public:
-            DtxAck(const framing::SequenceSet& acked, DeliveryRecords& unacked);
-            virtual bool prepare(TransactionContext* ctxt) throw();
-            virtual void commit() throw();
-            virtual void rollback() throw();
-            virtual ~DtxAck(){}
-            virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); }
-        };
-    }
-}
+  public:
+    DtxAck(const framing::SequenceSet& acked, DeliveryRecords& unacked);
+    DtxAck(DeliveryRecords& unacked);
+    virtual bool prepare(TransactionContext* ctxt) throw();
+    virtual void commit() throw();
+    virtual void rollback() throw();
+    virtual ~DtxAck(){}
+    virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); }
+    const DeliveryRecords& getPending() const { return pending; }
+};
 
+}} // qpid::broker
 
-#endif
+#endif  /*!QPID_BROKER_DTXACK_H*/

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxBuffer.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxBuffer.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxBuffer.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxBuffer.cpp Fri Oct 21 14:42:12 2011
@@ -23,8 +23,11 @@
 using namespace qpid::broker;
 using qpid::sys::Mutex;
 
-DtxBuffer::DtxBuffer(const std::string& _xid) 
-    : xid(_xid), ended(false), suspended(false), failed(false), expired(false) {}
+DtxBuffer::DtxBuffer(
+    const std::string& _xid,
+    bool ended_, bool suspended_, bool failed_, bool expired_)
+    : xid(_xid), ended(ended_), suspended(suspended_), failed(failed_), expired(expired_)
+{}
 
 DtxBuffer::~DtxBuffer() {}
 
@@ -34,7 +37,7 @@ void DtxBuffer::markEnded() 
     ended = true; 
 }
 
-bool DtxBuffer::isEnded() 
+bool DtxBuffer::isEnded() const
 { 
     Mutex::ScopedLock locker(lock); 
     return ended; 
@@ -45,7 +48,7 @@ void DtxBuffer::setSuspended(bool isSusp
     suspended = isSuspended; 
 }
 
-bool DtxBuffer::isSuspended() 
+bool DtxBuffer::isSuspended() const
 { 
     return suspended; 
 }
@@ -58,13 +61,13 @@ void DtxBuffer::fail()
     ended = true;
 }
 
-bool DtxBuffer::isRollbackOnly()
+bool DtxBuffer::isRollbackOnly() const
 {
     Mutex::ScopedLock locker(lock); 
     return failed;
 }
 
-const std::string& DtxBuffer::getXid()
+std::string DtxBuffer::getXid() const
 {
     return xid;
 }
@@ -76,8 +79,13 @@ void DtxBuffer::timedout()
     fail();
 }
 
-bool DtxBuffer::isExpired()
+bool DtxBuffer::isExpired() const
 {
     Mutex::ScopedLock locker(lock); 
     return expired;
 }
+
+bool DtxBuffer::isFailed() const
+{
+    return failed;
+}

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxBuffer.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxBuffer.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxBuffer.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxBuffer.h Fri Oct 21 14:42:12 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -26,31 +26,34 @@
 #include "qpid/sys/Mutex.h"
 
 namespace qpid {
-    namespace broker {
-        class DtxBuffer : public TxBuffer{
-            sys::Mutex lock;
-            const std::string xid;
-            bool ended;
-            bool suspended;           
-            bool failed;
-            bool expired;
-
-        public:
-            typedef boost::shared_ptr<DtxBuffer> shared_ptr;
-
-            QPID_BROKER_EXTERN DtxBuffer(const std::string& xid = "");
-            QPID_BROKER_EXTERN ~DtxBuffer();
-            QPID_BROKER_EXTERN void markEnded();
-            bool isEnded();
-            void setSuspended(bool suspended);
-            bool isSuspended();
-            void fail();
-            bool isRollbackOnly();
-            void timedout();
-            bool isExpired();
-            const std::string& getXid();
-        };
-    }
+namespace broker {
+class DtxBuffer : public TxBuffer{
+    mutable sys::Mutex lock;
+    const std::string xid;
+    bool ended;
+    bool suspended;
+    bool failed;
+    bool expired;
+
+  public:
+    typedef boost::shared_ptr<DtxBuffer> shared_ptr;
+
+    QPID_BROKER_EXTERN DtxBuffer(
+        const std::string& xid = "",
+        bool ended=false, bool suspended=false, bool failed=false, bool expired=false);
+    QPID_BROKER_EXTERN ~DtxBuffer();
+    QPID_BROKER_EXTERN void markEnded();
+    bool isEnded() const;
+    void setSuspended(bool suspended);
+    bool isSuspended() const;
+    void fail();
+    bool isRollbackOnly() const;
+    void timedout();
+    bool isExpired() const;
+    bool isFailed() const;
+    std::string getXid() const;
+};
+}
 }
 
 

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxManager.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxManager.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxManager.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxManager.cpp Fri Oct 21 14:42:12 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -34,7 +34,7 @@ using qpid::ptr_map_ptr;
 using namespace qpid::broker;
 using namespace qpid::framing;
 
-DtxManager::DtxManager(qpid::sys::Timer& t) : store(0), timer(t) {}
+DtxManager::DtxManager(qpid::sys::Timer& t) : store(0), timer(&t) {}
 
 DtxManager::~DtxManager() {}
 
@@ -53,8 +53,8 @@ void DtxManager::recover(const std::stri
     createWork(xid)->recover(txn, ops);
 }
 
-bool DtxManager::prepare(const std::string& xid) 
-{ 
+bool DtxManager::prepare(const std::string& xid)
+{
     QPID_LOG(debug, "preparing: " << xid);
     try {
         return getWork(xid)->prepare();
@@ -64,8 +64,8 @@ bool DtxManager::prepare(const std::stri
     }
 }
 
-bool DtxManager::commit(const std::string& xid, bool onePhase) 
-{ 
+bool DtxManager::commit(const std::string& xid, bool onePhase)
+{
     QPID_LOG(debug, "committing: " << xid);
     try {
         bool result = getWork(xid)->commit(onePhase);
@@ -77,8 +77,8 @@ bool DtxManager::commit(const std::strin
     }
 }
 
-void DtxManager::rollback(const std::string& xid) 
-{ 
+void DtxManager::rollback(const std::string& xid)
+{
     QPID_LOG(debug, "rolling back: " << xid);
     try {
         getWork(xid)->rollback();
@@ -91,7 +91,7 @@ void DtxManager::rollback(const std::str
 
 DtxWorkRecord* DtxManager::getWork(const std::string& xid)
 {
-    Mutex::ScopedLock locker(lock); 
+    Mutex::ScopedLock locker(lock);
     WorkMap::iterator i = work.find(xid);
     if (i == work.end()) {
         throw NotFoundException(QPID_MSG("Unrecognised xid " << xid));
@@ -99,9 +99,14 @@ DtxWorkRecord* DtxManager::getWork(const
     return ptr_map_ptr(i);
 }
 
+bool DtxManager::exists(const std::string& xid) {
+    Mutex::ScopedLock locker(lock);
+    return  work.find(xid) != work.end();
+}
+
 void DtxManager::remove(const std::string& xid)
 {
-    Mutex::ScopedLock locker(lock); 
+    Mutex::ScopedLock locker(lock);
     WorkMap::iterator i = work.find(xid);
     if (i == work.end()) {
         throw NotFoundException(QPID_MSG("Unrecognised xid " << xid));
@@ -110,14 +115,15 @@ void DtxManager::remove(const std::strin
     }
 }
 
-DtxWorkRecord* DtxManager::createWork(std::string xid)
+DtxWorkRecord* DtxManager::createWork(const std::string& xid)
 {
-    Mutex::ScopedLock locker(lock); 
+    Mutex::ScopedLock locker(lock);
     WorkMap::iterator i = work.find(xid);
     if (i != work.end()) {
         throw NotAllowedException(QPID_MSG("Xid " << xid << " is already known (use 'join' to add work to an existing xid)"));
     } else {
-      return ptr_map_ptr(work.insert(xid, new DtxWorkRecord(xid, store)).first);
+        std::string ncxid = xid; // Work around const correctness problems in ptr_map.
+        return ptr_map_ptr(work.insert(ncxid, new DtxWorkRecord(ncxid, store)).first);
     }
 }
 
@@ -131,7 +137,7 @@ void DtxManager::setTimeout(const std::s
     }
     timeout = intrusive_ptr<DtxTimeout>(new DtxTimeout(secs, *this, xid));
     record->setTimeout(timeout);
-    timer.add(timeout);
+    timer->add(timeout);
 }
 
 uint32_t DtxManager::getTimeout(const std::string& xid)
@@ -142,7 +148,7 @@ uint32_t DtxManager::getTimeout(const st
 
 void DtxManager::timedout(const std::string& xid)
 {
-    Mutex::ScopedLock locker(lock); 
+    Mutex::ScopedLock locker(lock);
     WorkMap::iterator i = work.find(xid);
     if (i == work.end()) {
         QPID_LOG(warning, "Transaction timeout failed: no record for xid");
@@ -153,7 +159,7 @@ void DtxManager::timedout(const std::str
     }
 }
 
-DtxManager::DtxCleanup::DtxCleanup(uint32_t _timeout, DtxManager& _mgr, const std::string& _xid) 
+DtxManager::DtxCleanup::DtxCleanup(uint32_t _timeout, DtxManager& _mgr, const std::string& _xid)
     : TimerTask(qpid::sys::Duration(_timeout * qpid::sys::TIME_SEC),"DtxCleanup"), mgr(_mgr), xid(_xid) {}
 
 void DtxManager::DtxCleanup::fire()

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxManager.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxManager.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxManager.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxManager.h Fri Oct 21 14:42:12 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -26,8 +26,8 @@
 #include "qpid/broker/DtxWorkRecord.h"
 #include "qpid/broker/TransactionalStore.h"
 #include "qpid/framing/amqp_types.h"
-#include "qpid/sys/Timer.h"
 #include "qpid/sys/Mutex.h"
+#include "qpid/ptr_map.h"
 
 namespace qpid {
 namespace broker {
@@ -39,22 +39,21 @@ class DtxManager{
     {
         DtxManager& mgr;
         const std::string& xid;
-        
-        DtxCleanup(uint32_t timeout, DtxManager& mgr, const std::string& xid);    
+
+        DtxCleanup(uint32_t timeout, DtxManager& mgr, const std::string& xid);
         void fire();
     };
 
     WorkMap work;
     TransactionalStore* store;
     qpid::sys::Mutex lock;
-    qpid::sys::Timer& timer;
+    qpid::sys::Timer* timer;
 
     void remove(const std::string& xid);
-    DtxWorkRecord* getWork(const std::string& xid);
-    DtxWorkRecord* createWork(std::string xid);
+    DtxWorkRecord* createWork(const std::string& xid);
 
 public:
-    DtxManager(qpid::sys::Timer&);
+    DtxManager(sys::Timer&);
     ~DtxManager();
     void start(const std::string& xid, DtxBuffer::shared_ptr work);
     void join(const std::string& xid, DtxBuffer::shared_ptr work);
@@ -66,6 +65,15 @@ public:
     uint32_t getTimeout(const std::string& xid);
     void timedout(const std::string& xid);
     void setStore(TransactionalStore* store);
+    void setTimer(sys::Timer& t) { timer = &t; }
+
+    // Used by cluster for replication.
+    template<class F> void each(F f) const {
+        for (WorkMap::const_iterator i = work.begin(); i != work.end(); ++i)
+            f(*ptr_map_ptr(i));
+    }
+    DtxWorkRecord* getWork(const std::string& xid);
+    bool exists(const std::string& xid);
 };
 
 }

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxTimeout.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxTimeout.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxTimeout.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxTimeout.cpp Fri Oct 21 14:42:12 2011
@@ -25,7 +25,7 @@
 using namespace qpid::broker;
 
 DtxTimeout::DtxTimeout(uint32_t _timeout, DtxManager& _mgr, const std::string& _xid) 
-    : TimerTask(qpid::sys::Duration(_timeout * qpid::sys::TIME_SEC),"DtxTimeout"), timeout(_timeout), mgr(_mgr), xid(_xid)
+    : TimerTask(qpid::sys::Duration(_timeout * qpid::sys::TIME_SEC),"DtxTimeout-"+_xid), timeout(_timeout), mgr(_mgr), xid(_xid)
 {
 }
 

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxTimeout.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxTimeout.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxTimeout.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxTimeout.h Fri Oct 21 14:42:12 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -29,7 +29,9 @@ namespace broker {
 
 class DtxManager;
 
-struct DtxTimeoutException : public Exception {};
+struct DtxTimeoutException : public Exception {
+    DtxTimeoutException(const std::string& msg=std::string()) : Exception(msg) {}
+};
 
 struct DtxTimeout : public sys::TimerTask
 {
@@ -37,7 +39,7 @@ struct DtxTimeout : public sys::TimerTas
     DtxManager& mgr;
     const std::string xid;
 
-    DtxTimeout(uint32_t timeout, DtxManager& mgr, const std::string& xid);    
+    DtxTimeout(uint32_t timeout, DtxManager& mgr, const std::string& xid);
     void fire();
 };
 

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxWorkRecord.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxWorkRecord.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxWorkRecord.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/DtxWorkRecord.cpp Fri Oct 21 14:42:12 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -28,19 +28,19 @@ using qpid::sys::Mutex;
 using namespace qpid::broker;
 using namespace qpid::framing;
 
-DtxWorkRecord::DtxWorkRecord(const std::string& _xid, TransactionalStore* const _store) : 
+DtxWorkRecord::DtxWorkRecord(const std::string& _xid, TransactionalStore* const _store) :
     xid(_xid), store(_store), completed(false), rolledback(false), prepared(false), expired(false) {}
 
-DtxWorkRecord::~DtxWorkRecord() 
+DtxWorkRecord::~DtxWorkRecord()
 {
-    if (timeout.get()) {  
+    if (timeout.get()) {
         timeout->cancel();
     }
 }
 
 bool DtxWorkRecord::prepare()
 {
-    Mutex::ScopedLock locker(lock);     
+    Mutex::ScopedLock locker(lock);
     if (check()) {
         txn = store->begin(xid);
         if (prepare(txn.get())) {
@@ -68,7 +68,7 @@ bool DtxWorkRecord::prepare(TransactionC
 
 bool DtxWorkRecord::commit(bool onePhase)
 {
-    Mutex::ScopedLock locker(lock); 
+    Mutex::ScopedLock locker(lock);
     if (check()) {
         if (prepared) {
             //already prepared i.e. 2pc
@@ -78,13 +78,13 @@ bool DtxWorkRecord::commit(bool onePhase
 
             store->commit(*txn);
             txn.reset();
-            
+
             std::for_each(work.begin(), work.end(), mem_fn(&TxBuffer::commit));
             return true;
         } else {
             //1pc commit optimisation, don't need a 2pc transaction context:
             if (!onePhase) {
-                throw IllegalStateException(QPID_MSG("Branch with xid " << xid << " has not been prepared, one-phase option required!"));        
+                throw IllegalStateException(QPID_MSG("Branch with xid " << xid << " has not been prepared, one-phase option required!"));
             }
             std::auto_ptr<TransactionContext> localtxn = store->begin();
             if (prepare(localtxn.get())) {
@@ -107,16 +107,16 @@ bool DtxWorkRecord::commit(bool onePhase
 
 void DtxWorkRecord::rollback()
 {
-    Mutex::ScopedLock locker(lock); 
+    Mutex::ScopedLock locker(lock);
     check();
     abort();
 }
 
 void DtxWorkRecord::add(DtxBuffer::shared_ptr ops)
 {
-    Mutex::ScopedLock locker(lock); 
+    Mutex::ScopedLock locker(lock);
     if (expired) {
-        throw DtxTimeoutException();
+        throw DtxTimeoutException(QPID_MSG("Branch with xid " << xid << " has timed out."));
     }
     if (completed) {
         throw CommandInvalidException(QPID_MSG("Branch with xid " << xid << " has been completed!"));
@@ -163,7 +163,7 @@ void DtxWorkRecord::recover(std::auto_pt
 
 void DtxWorkRecord::timedout()
 {
-    Mutex::ScopedLock locker(lock); 
+    Mutex::ScopedLock locker(lock);
     expired = true;
     rolledback = true;
     if (!completed) {
@@ -175,3 +175,17 @@ void DtxWorkRecord::timedout()
     }
     abort();
 }
+
+size_t DtxWorkRecord::indexOf(const DtxBuffer::shared_ptr& buf) {
+    Work::iterator i = std::find(work.begin(), work.end(), buf);
+    if (i == work.end()) throw NotFoundException(
+        QPID_MSG("Can't find DTX buffer for xid: " << buf->getXid()));
+    return i - work.begin();
+}
+
+DtxBuffer::shared_ptr DtxWorkRecord::operator[](size_t i) const {
+    if (i > work.size())
+        throw NotFoundException(
+            QPID_MSG("Can't find DTX buffer " << i << " for xid: " << xid));
+    return work[i];
+}



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org