You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2012/11/19 13:48:08 UTC

svn commit: r1411155 [2/6] - in /qpid/branches/java-broker-config-qpid-4390/qpid: ./ cpp/ cpp/bindings/ cpp/bindings/qpid/dotnet/ cpp/bindings/qpid/dotnet/examples/csharp.map.callback.receiver/ cpp/bindings/qpid/dotnet/msvc9/ cpp/bindings/qpid/dotnet/s...

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Bridge.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Bridge.cpp?rev=1411155&r1=1411154&r2=1411155&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Bridge.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Bridge.cpp Mon Nov 19 12:47:53 2012
@@ -121,28 +121,30 @@ void Bridge::create(Connection& c)
         peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
         QPID_LOG(debug, "Activated bridge " << name << " for route from queue " << args.i_src << " to " << args.i_dest);
     } else {
-        FieldTable queueSettings;
+        if (!useExistingQueue) {
+            FieldTable queueSettings;
 
-        if (args.i_tag.size()) {
-            queueSettings.setString("qpid.trace.id", args.i_tag);
-        } else {
-            const string& peerTag = c.getFederationPeerTag();
-            if (peerTag.size())
-                queueSettings.setString("qpid.trace.id", peerTag);
+            if (args.i_tag.size()) {
+                queueSettings.setString("qpid.trace.id", args.i_tag);
+            } else {
+                const string& peerTag = c.getFederationPeerTag();
+                if (peerTag.size())
+                    queueSettings.setString("qpid.trace.id", peerTag);
+            }
+
+            if (args.i_excludes.size()) {
+                queueSettings.setString("qpid.trace.exclude", args.i_excludes);
+            } else {
+                const string& localTag = link->getBroker()->getFederationTag();
+                if (localTag.size())
+                    queueSettings.setString("qpid.trace.exclude", localTag);
+            }
+
+            bool durable = false;//should this be an arg, or would we use srcIsQueue for durable queues?
+            bool exclusive = true;  // only exclusive if the queue is owned by the bridge
+            bool autoDelete = exclusive && !durable;//auto delete transient queues?
+            peer->getQueue().declare(queueName, altEx, false, durable, exclusive, autoDelete, queueSettings);
         }
-
-        if (args.i_excludes.size()) {
-            queueSettings.setString("qpid.trace.exclude", args.i_excludes);
-        } else {
-            const string& localTag = link->getBroker()->getFederationTag();
-            if (localTag.size())
-                queueSettings.setString("qpid.trace.exclude", localTag);
-        }
-
-        bool durable = false;//should this be an arg, or would we use srcIsQueue for durable queues?
-        bool exclusive = !useExistingQueue;  // only exclusive if the queue is owned by the bridge
-        bool autoDelete = exclusive && !durable;//auto delete transient queues?
-        peer->getQueue().declare(queueName, altEx, false, durable, exclusive, autoDelete, queueSettings);
         if (!args.i_dynamic)
             peer->getExchange().bind(queueName, args.i_src, args.i_key, FieldTable());
         peer->getMessage().subscribe(queueName, args.i_dest, (useExistingQueue && args.i_sync) ? 0 : 1, 0, false, "", 0, options);

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Bridge.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Bridge.h?rev=1411155&r1=1411154&r2=1411155&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Bridge.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Bridge.h Mon Nov 19 12:47:53 2012
@@ -65,6 +65,7 @@ class Bridge : public PersistableConfig,
 
     QPID_BROKER_EXTERN void close();
     bool isDurable() { return args.i_durable; }
+    framing::ChannelId getChannel() const { return channel; }
     Link *getLink() const { return link; }
     const std::string getSrc() const { return args.i_src; }
     const std::string getDest() const { return args.i_dest; }

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1411155&r1=1411154&r2=1411155&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Broker.cpp Mon Nov 19 12:47:53 2012
@@ -48,8 +48,6 @@
 #include "qmf/org/apache/qpid/broker/ArgsBrokerGetTimestampConfig.h"
 #include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
 #include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
-#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h"
-#include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
 #include "qmf/org/apache/qpid/broker/EventBind.h"
 #include "qmf/org/apache/qpid/broker/EventUnbind.h"
 #include "qpid/amqp_0_10/Codecs.h"
@@ -146,6 +144,7 @@ Broker::Options::Options(const std::stri
         ("data-dir", optValue(dataDir,"DIR"), "Directory to contain persistent data generated by the broker")
         ("no-data-dir", optValue(noDataDir), "Don't use a data directory.  No persistent configuration will be loaded or stored")
         ("port,p", optValue(port,"PORT"), "Tells the broker to listen on PORT")
+        ("interface", optValue(listenInterfaces, "<interface name>|<interface address>"), "Which network interfaces to use to listen for incoming connections")
         ("worker-threads", optValue(workerThreads, "N"), "Sets the broker thread pool size")
         ("connection-backlog", optValue(connectionBacklog, "N"), "Sets the connection backlog limit for the server socket")
         ("mgmt-enable,m", optValue(enableMgmt,"yes|no"), "Enable Management")
@@ -1112,21 +1111,12 @@ std::pair<boost::shared_ptr<Queue>, bool
         if (!alternate) throw framing::NotFoundException(QPID_MSG("Alternate exchange does not exist: " << alternateExchange));
     }
 
-    std::pair<Queue::shared_ptr, bool> result = queues.declare(name, settings, alternate);
+    std::pair<Queue::shared_ptr, bool> result =
+        queues.declare(name, settings, alternate, false/*recovering*/,
+                       owner, connectionId, userId);
     if (result.second) {
         //add default binding:
         result.first->bind(exchanges.getDefault(), name);
-
-        if (managementAgent.get()) {
-            //TODO: debatable whether we should raise an event here for
-            //create when this is a 'declare' event; ideally add a create
-            //event instead?
-            managementAgent->raiseEvent(
-                _qmf::EventQueueDeclare(connectionId, userId, name,
-                                        settings.durable, owner, settings.autodelete, alternateExchange,
-                                        settings.asMap(),
-                                        "created"));
-        }
         QPID_LOG_CAT(debug, model, "Create queue. name:" << name
             << " user:" << userId
             << " rhost:" << connectionId
@@ -1150,17 +1140,14 @@ void Broker::deleteQueue(const std::stri
         if (check) check(queue);
         if (acl)
             acl->recordDestroyQueue(name);
-        queues.destroy(name);
+        queues.destroy(name, connectionId, userId);
         queue->destroyed();
     } else {
         throw framing::NotFoundException(QPID_MSG("Delete failed. No such queue: " << name));
     }
-
-    if (managementAgent.get())
-        managementAgent->raiseEvent(_qmf::EventQueueDelete(connectionId, userId, name));
     QPID_LOG_CAT(debug, model, "Delete queue. name:" << name
-        << " user:" << userId
-        << " rhost:" << connectionId
+                 << " user:" << userId
+                 << " rhost:" << connectionId
     );
 
 }
@@ -1190,29 +1177,12 @@ std::pair<Exchange::shared_ptr, bool> Br
     }
 
     std::pair<Exchange::shared_ptr, bool> result;
-    result = exchanges.declare(name, type, durable, arguments);
+    result = exchanges.declare(
+        name, type, durable, arguments, alternate, connectionId, userId);
     if (result.second) {
-        if (alternate) {
-            result.first->setAlternate(alternate);
-            alternate->incAlternateUsers();
-        }
         if (durable) {
             store->create(*result.first, arguments);
         }
-        if (managementAgent.get()) {
-            //TODO: debatable whether we should raise an event here for
-            //create when this is a 'declare' event; ideally add a create
-            //event instead?
-            managementAgent->raiseEvent(_qmf::EventExchangeDeclare(connectionId,
-                                                         userId,
-                                                         name,
-                                                         type,
-                                                         alternateExchange,
-                                                         durable,
-                                                         false,
-                                                         ManagementAgent::toMap(arguments),
-                                                         "created"));
-        }
         QPID_LOG_CAT(debug, model, "Create exchange. name:" << name
             << " user:" << userId
             << " rhost:" << connectionId
@@ -1239,10 +1209,7 @@ void Broker::deleteExchange(const std::s
     if (exchange->inUseAsAlternate()) throw framing::NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange."));
     if (exchange->isDurable()) store->destroy(*exchange);
     if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers();
-    exchanges.destroy(name);
-
-    if (managementAgent.get())
-        managementAgent->raiseEvent(_qmf::EventExchangeDelete(connectionId, userId, name));
+    exchanges.destroy(name, connectionId,  userId);
     QPID_LOG_CAT(debug, model, "Delete exchange. name:" << name
         << " user:" << userId
         << " rhost:" << connectionId);

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Broker.h?rev=1411155&r1=1411154&r2=1411155&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Broker.h Mon Nov 19 12:47:53 2012
@@ -103,6 +103,7 @@ class Broker : public sys::Runnable, pub
         bool noDataDir;
         std::string dataDir;
         uint16_t port;
+        std::vector<std::string> listenInterfaces;
         int workerThreads;
         int connectionBacklog;
         bool enableMgmt;

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/ConfigurationObserver.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/ConfigurationObserver.h?rev=1411155&r1=1411154&r2=1411155&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/ConfigurationObserver.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/ConfigurationObserver.h Mon Nov 19 12:47:53 2012
@@ -38,6 +38,10 @@ class Exchange;
 
 /**
  * Observer for changes to configuration (aka wiring)
+ *
+ * NOTE: create and destroy functions are called with
+ * the registry lock held. This is necessary to ensure
+ * they are called in the correct sequence.
  */
 class ConfigurationObserver
 {

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Exchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=1411155&r1=1411154&r2=1411155&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Exchange.cpp Mon Nov 19 12:47:53 2012
@@ -408,5 +408,10 @@ bool Exchange::routeWithAlternate(Delive
     return msg.delivered;
 }
 
+void Exchange::setArgs(const framing::FieldTable& newArgs) {
+    args = newArgs;
+    if (mgmtExchange) mgmtExchange->set_arguments(ManagementAgent::toMap(args));
+}
+
 }}
 

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Exchange.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Exchange.h?rev=1411155&r1=1411154&r2=1411155&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Exchange.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Exchange.h Mon Nov 19 12:47:53 2012
@@ -173,8 +173,8 @@ public:
 
     const std::string& getName() const { return name; }
     bool isDurable() { return durable; }
-    qpid::framing::FieldTable& getArgs() { return args; }
-    const qpid::framing::FieldTable& getArgs() const { return args; }
+    QPID_BROKER_EXTERN const qpid::framing::FieldTable& getArgs() const { return args; }
+    QPID_BROKER_EXTERN void setArgs(const framing::FieldTable&);
 
     QPID_BROKER_EXTERN Exchange::shared_ptr getAlternate() { return alternate; }
     QPID_BROKER_EXTERN void setAlternate(Exchange::shared_ptr _alternate);

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp?rev=1411155&r1=1411154&r2=1411155&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp Mon Nov 19 12:47:53 2012
@@ -29,20 +29,26 @@
 #include "qpid/management/ManagementDirectExchange.h"
 #include "qpid/management/ManagementTopicExchange.h"
 #include "qpid/framing/reply_exceptions.h"
+#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
+#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
 
 using namespace qpid::broker;
 using namespace qpid::sys;
 using std::pair;
 using std::string;
 using qpid::framing::FieldTable;
+using qpid::management::ManagementAgent;
+namespace _qmf = qmf::org::apache::qpid::broker;
 
 pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type){
 
     return declare(name, type, false, FieldTable());
 }
 
-pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type,
-                                                           bool durable, const FieldTable& args){
+pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(
+    const string& name, const string& type, bool durable, const FieldTable& args,
+    Exchange::shared_ptr alternate, const string& connectionId, const string& userId)
+{
     Exchange::shared_ptr exchange;
     std::pair<Exchange::shared_ptr, bool> result;
     {
@@ -73,31 +79,58 @@ pair<Exchange::shared_ptr, bool> Exchang
             }
             exchanges[name] = exchange;
             result = std::pair<Exchange::shared_ptr, bool>(exchange, true);
+            if (alternate) {
+                exchange->setAlternate(alternate);
+                alternate->incAlternateUsers();
+            }
+            // Call exchangeCreate inside the lock to ensure correct ordering.
+            if (broker) broker->getConfigurationObservers().exchangeCreate(exchange);
         } else {
             result = std::pair<Exchange::shared_ptr, bool>(i->second, false);
         }
+        if (broker && broker->getManagementAgent()) {
+            // Call raiseEvent inside the lock to ensure correct ordering.
+            broker->getManagementAgent()->raiseEvent(
+                _qmf::EventExchangeDeclare(
+                    connectionId,
+                    userId,
+                    name,
+                    type,
+                    alternate ? alternate->getName() : string(),
+                    durable,
+                    false,
+                    ManagementAgent::toMap(result.first->getArgs()),
+                    "created"));
+        }
     }
-    if (broker && exchange) broker->getConfigurationObservers().exchangeCreate(exchange);
     return result;
 }
 
-void ExchangeRegistry::destroy(const string& name){
+void ExchangeRegistry::destroy(
+    const string& name, const string& connectionId, const string& userId)
+{
     if (name.empty() ||
         (name.find("amq.") == 0 &&
          (name == "amq.direct" || name == "amq.fanout" || name == "amq.topic" || name == "amq.match")) ||
         name == "qpid.management")
         throw framing::NotAllowedException(QPID_MSG("Cannot delete default exchange: '" << name << "'"));
-    Exchange::shared_ptr exchange;
     {
         RWlock::ScopedWlock locker(lock);
         ExchangeMap::iterator i =  exchanges.find(name);
         if (i != exchanges.end()) {
-            exchange = i->second;
+            if (broker) {
+                // Call exchangeDestroy and raiseEvent inside the lock to ensure
+                // correct ordering.
+                broker->getConfigurationObservers().exchangeDestroy(i->second);
+                if (broker->getManagementAgent())
+                    broker->getManagementAgent()->raiseEvent(
+                        _qmf::EventExchangeDelete(connectionId, userId, name));
+            }
             i->second->destroy();
             exchanges.erase(i);
+
         }
     }
-    if (broker && exchange) broker->getConfigurationObservers().exchangeDestroy(exchange);
 }
 
 Exchange::shared_ptr ExchangeRegistry::find(const string& name){

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/ExchangeRegistry.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/ExchangeRegistry.h?rev=1411155&r1=1411154&r2=1411155&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/ExchangeRegistry.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/ExchangeRegistry.h Mon Nov 19 12:47:53 2012
@@ -46,14 +46,23 @@ class ExchangeRegistry{
                              bool, const qpid::framing::FieldTable&, qpid::management::Manageable*, qpid::broker::Broker*> FactoryFunction;
 
     ExchangeRegistry (Broker* b = 0) : parent(0), broker(b) {}
-    QPID_BROKER_EXTERN std::pair<Exchange::shared_ptr, bool> declare
-      (const std::string& name, const std::string& type);
-    QPID_BROKER_EXTERN std::pair<Exchange::shared_ptr, bool> declare
-      (const std::string& name,
-       const std::string& type, 
-       bool durable,
-       const qpid::framing::FieldTable& args = framing::FieldTable());
-    QPID_BROKER_EXTERN void destroy(const std::string& name);
+    QPID_BROKER_EXTERN std::pair<Exchange::shared_ptr, bool> declare(
+        const std::string& name, const std::string& type);
+
+    QPID_BROKER_EXTERN std::pair<Exchange::shared_ptr, bool> declare(
+        const std::string& name,
+        const std::string& type,
+        bool durable,
+        const qpid::framing::FieldTable& args = framing::FieldTable(),
+        Exchange::shared_ptr alternate = Exchange::shared_ptr(),
+        const std::string& connectionId = std::string(),
+        const std::string& userId = std::string());
+
+    QPID_BROKER_EXTERN void destroy(
+        const std::string& name,
+        const std::string& connectionId = std::string(),
+        const std::string& userId = std::string());
+
     QPID_BROKER_EXTERN Exchange::shared_ptr getDefault();
 
     /**

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Link.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Link.cpp?rev=1411155&r1=1411154&r2=1411155&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Link.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Link.cpp Mon Nov 19 12:47:53 2012
@@ -30,6 +30,7 @@
 #include "qpid/log/Statement.h"
 #include "qpid/framing/enum.h"
 #include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/amqp_types.h"
 #include "qpid/broker/AclModule.h"
 #include "qpid/broker/Exchange.h"
 #include "qpid/UrlArray.h"
@@ -148,7 +149,8 @@ Link::Link(const string&  _name,
       currentInterval(1),
       closing(false),
       reconnectNext(0),         // Index of next address for reconnecting in url.
-      channelCounter(1),
+      nextFreeChannel(1),
+      freeChannels(1, framing::CHANNEL_MAX),
       connection(0),
       agent(0),
       listener(l),
@@ -542,12 +544,41 @@ bool Link::hideManagement() const {
     return !mgmtObject || ( broker && broker->isInCluster());
 }
 
-uint Link::nextChannel()
+// Allocate channel from link free pool
+framing::ChannelId Link::nextChannel()
 {
     Mutex::ScopedLock mutex(lock);
-    if (channelCounter >= framing::CHANNEL_MAX)
-        channelCounter = 1;
-    return channelCounter++;
+    if (!freeChannels.empty()) {
+        // A free channel exists.
+        for (framing::ChannelId i = 1; i <= framing::CHANNEL_MAX; i++)
+        {
+            // extract proposed free channel
+            framing::ChannelId c = nextFreeChannel;
+            // calculate next free channel
+            if (framing::CHANNEL_MAX == nextFreeChannel)
+                nextFreeChannel = 1;
+            else
+                nextFreeChannel += 1;
+            // if proposed channel is free, use it
+            if (freeChannels.contains(c))
+            {
+                freeChannels -= c;
+                QPID_LOG(debug, "Link " << name << " allocates channel: " << c);
+                return c;
+            }
+        }
+        assert (false);
+    }
+
+    throw Exception(Msg() << "Link " << name << " channel pool is empty");
+}
+
+// Return channel to link free pool
+void Link::returnChannel(framing::ChannelId c)
+{
+    Mutex::ScopedLock mutex(lock);
+    QPID_LOG(debug, "Link " << name << " frees channel: " << c);
+    freeChannels += c;
 }
 
 void Link::notifyConnectionForced(const string text)

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Link.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Link.h?rev=1411155&r1=1411154&r2=1411155&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Link.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Link.h Mon Nov 19 12:47:53 2012
@@ -82,7 +82,8 @@ class Link : public PersistableConfig, p
     Bridges created;   // Bridges pending creation
     Bridges active;    // Bridges active
     Bridges cancellations;    // Bridges pending cancellation
-    uint channelCounter;
+    framing::ChannelId nextFreeChannel;
+    RangeSet<framing::ChannelId> freeChannels;
     Connection* connection;
     management::ManagementAgent* agent;
     boost::function<void(Link*)> listener;
@@ -151,7 +152,8 @@ class Link : public PersistableConfig, p
 
     bool isDurable() { return durable; }
     void maintenanceVisit ();
-    uint nextChannel();
+    framing::ChannelId nextChannel();        // allocate channel from link free pool
+    void returnChannel(framing::ChannelId);  // return channel to link free pool
     void add(Bridge::shared_ptr);
     void cancel(Bridge::shared_ptr);
 

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/LinkRegistry.cpp?rev=1411155&r1=1411154&r2=1411155&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/LinkRegistry.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/LinkRegistry.cpp Mon Nov 19 12:47:53 2012
@@ -254,6 +254,7 @@ void LinkRegistry::destroyBridge(Bridge 
     Link *link = b->second->getLink();
     if (link) {
         link->cancel(b->second);
+        link->returnChannel( bridge->getChannel() );
     }
     if (b->second->isDurable())
         store->destroy(*(b->second));

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1411155&r1=1411154&r2=1411155&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Queue.cpp Mon Nov 19 12:47:53 2012
@@ -1169,14 +1169,10 @@ void tryAutoDeleteImpl(Broker& broker, Q
 {
     if (broker.getQueues().destroyIf(queue->getName(),
                                      boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) {
-        QPID_LOG(debug, "Auto-deleting " << queue->getName());
-        queue->destroyed();
-
-        if (broker.getManagementAgent())
-            broker.getManagementAgent()->raiseEvent(_qmf::EventQueueDelete(connectionId, userId, queue->getName()));
-        QPID_LOG_CAT(debug, model, "Delete queue. name:" << queue->getName()
+        QPID_LOG_CAT(debug, model, "Auto-delete queue: " << queue->getName()
             << " user:" << userId
             << " rhost:" << connectionId );
+        queue->destroyed();
     }
 }
 
@@ -1598,5 +1594,10 @@ void Queue::UsageBarrier::destroy()
     while (count) usageLock.wait();
 }
 
+void Queue::addArgument(const string& key, const types::Variant& value) {
+    settings.original.insert(types::Variant::Map::value_type(key, value));
+    if (mgmtObject != 0) mgmtObject->set_arguments(settings.asMap());
+}
+
 }}
 

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Queue.h?rev=1411155&r1=1411154&r2=1411155&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Queue.h Mon Nov 19 12:47:53 2012
@@ -145,7 +145,7 @@ class Queue : public boost::enable_share
     mutable qpid::sys::Mutex messageLock;
     mutable qpid::sys::Mutex ownershipLock;
     mutable uint64_t persistenceId;
-    const QueueSettings settings;
+    QueueSettings settings;
     qpid::framing::FieldTable encodableSettings;
     QueueDepth current;
     QueueBindings bindings;
@@ -423,6 +423,10 @@ class Queue : public boost::enable_share
 
     uint32_t getDequeueSincePurge() { return dequeueSincePurge.get(); }
     QPID_BROKER_EXTERN void setDequeueSincePurge(uint32_t value);
+
+    /** Add an argument to be included in management messages about this queue. */
+    QPID_BROKER_EXTERN void addArgument(const std::string& key, const types::Variant& value);
+
   friend class QueueFactory;
 };
 }

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/QueueRegistry.cpp?rev=1411155&r1=1411154&r2=1411155&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/QueueRegistry.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/QueueRegistry.cpp Mon Nov 19 12:47:53 2012
@@ -23,10 +23,14 @@
 #include "qpid/broker/QueueRegistry.h"
 #include "qpid/broker/Exchange.h"
 #include "qpid/log/Statement.h"
+#include "qpid/management/ManagementAgent.h"
 #include "qpid/framing/reply_exceptions.h"
+#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h"
+#include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
 #include <sstream>
 #include <assert.h>
 
+namespace _qmf = qmf::org::apache::qpid::broker;
 using namespace qpid::broker;
 using namespace qpid::sys;
 using std::string;
@@ -44,7 +48,10 @@ QueueRegistry::declare(const string& nam
                        bool recovering/*true if this declare is a
                                         result of recovering queue
                                         definition from persistent
-                                        record*/)
+                                        record*/,
+                       const OwnershipToken* owner,
+                       std::string connectionId,
+                       std::string userId)
 {
     std::pair<Queue::shared_ptr, bool> result;
     {
@@ -62,16 +69,31 @@ QueueRegistry::declare(const string& nam
                 queue->create();
             }
             queues[name] = queue;
+            // NOTE: raiseEvent and queueCreate must be called with the lock held in
+            // order to ensure events are generated in the correct order.
+            // Call queueCreate before raiseEvents so it can add arguments that
+            // will be included in the management event.
+            if (getBroker()) getBroker()->getConfigurationObservers().queueCreate(queue);
             result = std::pair<Queue::shared_ptr, bool>(queue, true);
         } else {
             result = std::pair<Queue::shared_ptr, bool>(i->second, false);
         }
+            if (getBroker() && getBroker()->getManagementAgent()) {
+            getBroker()->getManagementAgent()->raiseEvent(
+                _qmf::EventQueueDeclare(
+                    connectionId, userId, name,
+                    settings.durable, owner, settings.autodelete,
+                    alternate ? alternate->getName() : string(),
+                    result.first->getSettings().asMap(),
+                    result.second ? "created" : "existing"));
+        }
     }
-    if (getBroker() && result.second) getBroker()->getConfigurationObservers().queueCreate(result.first);
     return result;
 }
 
-void QueueRegistry::destroy(const string& name) {
+void QueueRegistry::destroy(
+    const string& name, const string& connectionId, const string& userId)
+{
     Queue::shared_ptr q;
     {
         qpid::sys::RWlock::ScopedWlock locker(lock);
@@ -79,9 +101,17 @@ void QueueRegistry::destroy(const string
         if (i != queues.end()) {
             q = i->second;
             queues.erase(i);
+            if (getBroker()) {
+                // NOTE: queueDestroy and raiseEvent must be called with the
+                // lock held in order to ensure events are generated
+                // in the correct order.
+                getBroker()->getConfigurationObservers().queueDestroy(q);
+                if (getBroker()->getManagementAgent())
+                    getBroker()->getManagementAgent()->raiseEvent(
+                        _qmf::EventQueueDelete(connectionId, userId, name));
+            }
         }
     }
-    if (getBroker() && q) getBroker()->getConfigurationObservers().queueDestroy(q);
 }
 
 Queue::shared_ptr QueueRegistry::find(const string& name){

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/QueueRegistry.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/QueueRegistry.h?rev=1411155&r1=1411154&r2=1411155&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/QueueRegistry.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/QueueRegistry.h Mon Nov 19 12:47:53 2012
@@ -59,7 +59,9 @@ class QueueRegistry : QueueFactory {
         const std::string& name,
         const QueueSettings& settings,
         boost::shared_ptr<Exchange> alternateExchange = boost::shared_ptr<Exchange>(),
-        bool recovering = false);
+        bool recovering = false,
+        const OwnershipToken* owner = 0,
+        std::string connectionId=std::string(), std::string userId=std::string());
 
     /**
      * Destroy the named queue.
@@ -73,7 +75,11 @@ class QueueRegistry : QueueFactory {
      * subsequent calls to find or declare with the same name.
      *
      */
-    QPID_BROKER_EXTERN void destroy(const std::string& name);
+    QPID_BROKER_EXTERN void destroy(
+        const std::string& name,
+        const std::string& connectionId=std::string(),
+        const std::string& userId=std::string());
+
     template <class Test> bool destroyIf(const std::string& name, Test test)
     {
         if (test()) {

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1411155&r1=1411154&r2=1411155&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SemanticState.cpp Mon Nov 19 12:47:53 2012
@@ -432,9 +432,9 @@ void SemanticState::cancel(ConsumerImpl:
     Queue::shared_ptr queue = c->getQueue();
     if(queue) {
         queue->cancel(c);
-        if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) {
+        // Only run auto-delete for counted consumers.
+        if (c->isCounted() && queue->canAutoDelete() && !queue->hasExclusiveOwner())
             Queue::tryAutoDelete(session.getBroker(), queue, connectionId, userID);
-        }
     }
     c->cancel();
 }

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=1411155&r1=1411154&r2=1411155&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Mon Nov 19 12:47:53 2012
@@ -98,17 +98,6 @@ void SessionAdapter::ExchangeHandlerImpl
                 //exchange already there, not created
                 checkType(response.first, type);
                 checkAlternate(response.first, alternate);
-                ManagementAgent* agent = getBroker().getManagementAgent();
-                if (agent)
-                    agent->raiseEvent(_qmf::EventExchangeDeclare(getConnection().getUrl(),
-                                                                 getConnection().getUserId(),
-                                                                 exchange,
-                                                                 type,
-                                                                 alternateExchange,
-                                                                 durable,
-                                                                 false,
-                                                                 ManagementAgent::toMap(args),
-                                                                 "existing"));
                 QPID_LOG_CAT(debug, model, "Create exchange. name:" << exchange
                     << " user:" << getConnection().getUserId()
                     << " rhost:" << getConnection().getUrl()
@@ -318,11 +307,6 @@ void SessionAdapter::QueueHandlerImpl::d
             if (exclusive && queue->setExclusiveOwner(&session)) {
                 exclusiveQueues.push_back(queue);
             }
-            ManagementAgent* agent = getBroker().getManagementAgent();
-            if (agent)
-                agent->raiseEvent(_qmf::EventQueueDeclare(getConnection().getUrl(), getConnection().getUserId(),
-                                                          name, durable, exclusive, autoDelete, alternateExchange, ManagementAgent::toMap(arguments),
-                                                          "existing"));
             QPID_LOG_CAT(debug, model, "Create queue. name:" << name
                 << " user:" << getConnection().getUserId()
                 << " rhost:" << getConnection().getUrl()

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp?rev=1411155&r1=1411154&r2=1411155&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp Mon Nov 19 12:47:53 2012
@@ -22,6 +22,7 @@
 #include "qpid/broker/amqp/Header.h"
 #include "qpid/broker/amqp/Translation.h"
 #include "qpid/broker/Queue.h"
+#include "qpid/broker/TopicKeyNode.h"
 #include "qpid/sys/OutputControl.h"
 #include "qpid/amqp/MessageEncoder.h"
 #include "qpid/log/Statement.h"
@@ -163,6 +164,57 @@ bool Outgoing::accept(const qpid::broker
     return canDeliver();
 }
 
+void Outgoing::setSubjectFilter(const std::string& f)
+{
+    subjectFilter = f;
+}
+
+namespace {
+
+bool match(TokenIterator& filter, TokenIterator& target)
+{
+    bool wild = false;
+    while (!filter.finished())
+    {
+        if (filter.match1('*')) {
+            if (target.finished()) return false;
+            //else move to next word in filter target
+            filter.next();
+            target.next();
+        } else if (filter.match1('#')) {
+            // i.e. filter word is '#' which can match a variable number of words in the target
+            filter.next();
+            if (filter.finished()) return true;
+            else if (target.finished()) return false;
+            wild = true;
+        } else {
+            //filter word needs to match target exactly
+            if (target.finished()) return false;
+            std::string word;
+            target.pop(word);
+            if (filter.match(word)) {
+                wild = false;
+                filter.next();
+            } else if (!wild) {
+                return false;
+            }
+        }
+    }
+    return target.finished();
+}
+bool match(const std::string& filter, const std::string& target)
+{
+    TokenIterator lhs(filter);
+    TokenIterator rhs(target);
+    return match(lhs, rhs);
+}
+}
+
+bool Outgoing::filter(const qpid::broker::Message& m)
+{
+    return subjectFilter.empty() || subjectFilter == m.getRoutingKey() || match(subjectFilter, m.getRoutingKey());
+}
+
 void Outgoing::cancel() {}
 
 void Outgoing::acknowledged(const qpid::broker::DeliveryRecord&) {}

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Outgoing.h?rev=1411155&r1=1411154&r2=1411155&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Outgoing.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Outgoing.h Mon Nov 19 12:47:53 2012
@@ -60,6 +60,7 @@ class Outgoing : public qpid::broker::Co
 {
   public:
     Outgoing(Broker&,boost::shared_ptr<Queue> q, pn_link_t* l, ManagedSession&, qpid::sys::OutputControl& o, bool topic);
+    void setSubjectFilter(const std::string&);
     void init();
     bool dispatch();
     void write(const char* data, size_t size);
@@ -71,6 +72,7 @@ class Outgoing : public qpid::broker::Co
     bool deliver(const QueueCursor& cursor, const qpid::broker::Message& msg);
     void notify();
     bool accept(const qpid::broker::Message&);
+    bool filter(const qpid::broker::Message&);
     void cancel();
     void acknowledged(const qpid::broker::DeliveryRecord&);
     qpid::broker::OwnershipToken* getSession();
@@ -99,6 +101,7 @@ class Outgoing : public qpid::broker::Co
     size_t current;
     int outstanding;
     std::vector<char> buffer;
+    std::string subjectFilter;
 };
 }}} // namespace qpid::broker::amqp
 

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Session.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Session.cpp?rev=1411155&r1=1411154&r2=1411155&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Session.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Session.cpp Mon Nov 19 12:47:53 2012
@@ -26,11 +26,16 @@
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/DeliverableMessage.h"
 #include "qpid/broker/Exchange.h"
+#include "qpid/broker/DirectExchange.h"
+#include "qpid/broker/TopicExchange.h"
 #include "qpid/broker/FanOutExchange.h"
 #include "qpid/broker/Message.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/broker/TopicExchange.h"
+#include "qpid/broker/amqp/Filter.h"
+#include "qpid/broker/amqp/NodeProperties.h"
 #include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/FieldTable.h"
 #include "qpid/framing/MessageTransferBody.h"
 #include "qpid/log/Statement.h"
 #include <boost/intrusive_ptr.hpp>
@@ -79,6 +84,31 @@ class Exchange : public Target
 Session::Session(pn_session_t* s, qpid::broker::Broker& b, ManagedConnection& c, qpid::sys::OutputControl& o)
     : ManagedSession(b, c, (boost::format("%1%") % s).str()), session(s), broker(b), connection(c), out(o), deleted(false) {}
 
+
+Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* terminus)
+{
+    ResolvedNode node;
+    node.exchange = broker.getExchanges().find(name);
+    node.queue = broker.getQueues().find(name);
+    if (!node.queue && !node.exchange && pn_terminus_is_dynamic(terminus)) {
+        //TODO: handle dynamic creation
+        //is it a queue or an exchange?
+        NodeProperties properties;
+        properties.read(pn_terminus_properties(terminus));
+        if (properties.isQueue()) {
+            node.queue = broker.createQueue(name, properties.getQueueSettings(), this, properties.getAlternateExchange(), connection.getUserid(), connection.getId()).first;
+        } else {
+            qpid::framing::FieldTable args;
+            node.exchange = broker.createExchange(name, properties.getExchangeType(), properties.isDurable(), properties.getAlternateExchange(),
+                                                  args, connection.getUserid(), connection.getId()).first;
+        }
+    } else if (node.queue && node.exchange) {
+        QPID_LOG_CAT(warning, protocol, "Ambiguous node name; " << name << " could be queue or exchange, assuming queue");
+        node.exchange.reset();
+    }
+    return node;
+}
+
 void Session::attach(pn_link_t* link)
 {
     if (pn_link_is_sender(link)) {
@@ -91,32 +121,36 @@ void Session::attach(pn_link_t* link)
         QPID_LOG(debug, "Received attach request for outgoing link from " << name);
         pn_terminus_set_address(pn_link_source(link), name.c_str());
 
-        boost::shared_ptr<qpid::broker::Exchange> exchange = broker.getExchanges().find(name);
-        boost::shared_ptr<qpid::broker::Queue> queue = broker.getQueues().find(name);
-        if (queue) {
-            if (exchange) {
-                QPID_LOG_CAT(warning, protocol, "Ambiguous node name; " << name << " could be queue or exchange, assuming queue");
-            }
-            boost::shared_ptr<Outgoing> q(new Outgoing(broker, queue, link, *this, out, false));
+        ResolvedNode node = resolve(name, source);
+        Filter filter;
+        filter.read(pn_terminus_filter(source));
+
+        if (node.queue) {
+            boost::shared_ptr<Outgoing> q(new Outgoing(broker, node.queue, link, *this, out, false));
             q->init();
+            if (filter.hasSubjectFilter()) {
+                q->setSubjectFilter(filter.getSubjectFilter());
+            }
             senders[link] = q;
-        } else if (exchange) {
+        } else if (node.exchange) {
             QueueSettings settings(false, true);
             //TODO: populate settings from source details when available from engine
-            queue = broker.createQueue(name + qpid::types::Uuid(true).str(), settings, this, "", connection.getUserid(), connection.getId()).first;
-            //TODO: bind based on filter when that is exposed by engine
-            if (exchange->getType() == FanOutExchange::typeName) {
-                exchange->bind(queue, std::string(), 0);
-            } else if (exchange->getType() == TopicExchange::typeName) {
-                exchange->bind(queue, "#", 0);
+            boost::shared_ptr<qpid::broker::Queue> queue
+                = broker.createQueue(name + qpid::types::Uuid(true).str(), settings, this, "", connection.getUserid(), connection.getId()).first;
+            if (filter.hasSubjectFilter()) {
+                filter.bind(node.exchange, queue);
+                filter.write(pn_terminus_filter(pn_link_source(link)));
+            } else if (node.exchange->getType() == FanOutExchange::typeName) {
+                node.exchange->bind(queue, std::string(), 0);
+            } else if (node.exchange->getType() == TopicExchange::typeName) {
+                node.exchange->bind(queue, "#", 0);
             } else {
-                throw qpid::Exception("Exchange type not yet supported over 1.0: " + exchange->getType());/*not-supported?*/
+                throw qpid::Exception("Exchange type requires a filter: " + node.exchange->getType());/*not-supported?*/
             }
             boost::shared_ptr<Outgoing> q(new Outgoing(broker, queue, link, *this, out, true));
             senders[link] = q;
             q->init();
         } else {
-            //TODO: handle dynamic creation
             pn_terminus_set_type(pn_link_source(link), PN_UNSPECIFIED);
             throw qpid::Exception("Node not found: " + name);/*not-found*/
         }
@@ -130,22 +164,17 @@ void Session::attach(pn_link_t* link)
         QPID_LOG(debug, "Received attach request for incoming link to " << name);
         pn_terminus_set_address(pn_link_target(link), name.c_str());
 
+        ResolvedNode node = resolve(name, target);
 
-        boost::shared_ptr<qpid::broker::Queue> queue = broker.getQueues().find(name);
-        boost::shared_ptr<qpid::broker::Exchange> exchange = broker.getExchanges().find(name);
-        if (queue) {
-            if (exchange) {
-                QPID_LOG_CAT(warning, protocol, "Ambiguous node name; " << name << " could be queue or exchange, assuming queue");
-            }
-            boost::shared_ptr<Target> q(new Queue(queue, link));
+        if (node.queue) {
+            boost::shared_ptr<Target> q(new Queue(node.queue, link));
             targets[link] = q;
             q->flow();
-        } else if (exchange) {
-            boost::shared_ptr<Target> e(new Exchange(exchange, link));
+        } else if (node.exchange) {
+            boost::shared_ptr<Target> e(new Exchange(node.exchange, link));
             targets[link] = e;
             e->flow();
         } else {
-            //TODO: handle dynamic creation
             pn_terminus_set_type(pn_link_target(link), PN_UNSPECIFIED);
             throw qpid::Exception("Node not found: " + name);/*not-found*/
         }

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Session.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Session.h?rev=1411155&r1=1411154&r2=1411155&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Session.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Session.h Mon Nov 19 12:47:53 2012
@@ -32,11 +32,14 @@
 struct pn_delivery_t;
 struct pn_link_t;
 struct pn_session_t;
+struct pn_terminus_t;
 
 namespace qpid {
 namespace broker {
 
 class Broker;
+class Exchange;
+class Queue;
 
 namespace amqp {
 
@@ -71,6 +74,13 @@ class Session : public ManagedSession, p
     std::deque<pn_delivery_t*> completed;
     bool deleted;
     qpid::sys::Mutex lock;
+    struct ResolvedNode
+    {
+        boost::shared_ptr<qpid::broker::Exchange> exchange;
+        boost::shared_ptr<qpid::broker::Queue> queue;
+    };
+
+    ResolvedNode resolve(const std::string name, pn_terminus_t* terminus);
 };
 }}} // namespace qpid::broker::amqp
 

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp?rev=1411155&r1=1411154&r2=1411155&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp Mon Nov 19 12:47:53 2012
@@ -93,9 +93,7 @@ class SslProtocolFactory : public qpid::
     CredHandle credHandle;
 
   public:
-    SslProtocolFactory(const SslServerOptions&, const std::string& host, const std::string& port,
-                       int backlog, bool nodelay,
-                       Timer& timer, uint32_t maxTime);
+    SslProtocolFactory(const qpid::broker::Broker::Options& opts, const SslServerOptions&, Timer& timer);
     ~SslProtocolFactory();
     void accept(sys::Poller::shared_ptr, sys::ConnectionCodec::Factory*);
     void connect(sys::Poller::shared_ptr, const std::string& host, const std::string& port,
@@ -129,10 +127,7 @@ static struct SslPlugin : public Plugin 
         if (broker) {
             try {
                 const broker::Broker::Options& opts = broker->getOptions();
-                ProtocolFactory::shared_ptr protocol(new SslProtocolFactory(options,
-                                                                            "", boost::lexical_cast<std::string>(options.port),
-                                                                            opts.connectionBacklog, opts.tcpNoDelay,
-                                                                            broker->getTimer(), opts.maxNegotiateTime));
+                ProtocolFactory::shared_ptr protocol(new SslProtocolFactory(opts, options, broker->getTimer()));
                 QPID_LOG(notice, "Listening for SSL connections on TCP port " << protocol->getPort());
                 broker->registerProtocolFactory("ssl", protocol);
             } catch (const std::exception& e) {
@@ -142,13 +137,36 @@ static struct SslPlugin : public Plugin 
     }
 } sslPlugin;
 
-SslProtocolFactory::SslProtocolFactory(const SslServerOptions& options,
-                                       const std::string& host, const std::string& port,
-                                       int backlog, bool nodelay,
-                                       Timer& timer, uint32_t maxTime)
+namespace {
+    // Expand list of Interfaces and addresses to a list of addresses
+    std::vector<std::string> expandInterfaces(const std::vector<std::string>& interfaces) {
+        std::vector<std::string> addresses;
+        // If there are no specific interfaces listed use a single "" to listen on every interface
+        if (interfaces.empty()) {
+            addresses.push_back("");
+            return addresses;
+        }
+        for (unsigned i = 0; i < interfaces.size(); ++i) {
+            const std::string& interface = interfaces[i];
+            if (!(SystemInfo::getInterfaceAddresses(interface, addresses))) {
+                // We don't have an interface of that name -
+                // Check for IPv6 ('[' ']') brackets and remove them
+                // then pass to be looked up directly
+                if (interface[0]=='[' && interface[interface.size()-1]==']') {
+                    addresses.push_back(interface.substr(1, interface.size()-2));
+                } else {
+                    addresses.push_back(interface);
+                }
+            }
+        }
+        return addresses;
+    }
+}
+
+SslProtocolFactory::SslProtocolFactory(const qpid::broker::Broker::Options& opts, const SslServerOptions& options, Timer& timer)
     : brokerTimer(timer),
-      maxNegotiateTime(maxTime),
-      tcpNoDelay(nodelay),
+      maxNegotiateTime(opts.maxNegotiateTime),
+      tcpNoDelay(opts.tcpNoDelay),
       clientAuthSelected(options.clientAuth) {
 
     // Make sure that certificate store is good before listening to sockets
@@ -158,7 +176,7 @@ SslProtocolFactory::SslProtocolFactory(c
     // Get the certificate for this server.
     DWORD flags = 0;
     std::string certStoreLocation = options.certStoreLocation;
-    std::transform(certStoreLocation.begin(), certStoreLocation.end(), certStoreLocation.begin(), ::tolower);
+    std::transform(certStoreLocation.begin(), certStoreLocation.end(), certStoreLocation.begin(), ::tolower);
     if (certStoreLocation == "currentuser") {
         flags = CERT_SYSTEM_STORE_CURRENT_USER;
     } else if (certStoreLocation == "localmachine") {
@@ -212,21 +230,31 @@ SslProtocolFactory::SslProtocolFactory(c
     ::CertFreeCertificateContext(certContext);
     ::CertCloseStore(certStoreHandle, 0);
 
-    // Listen to socket(s)
-    SocketAddress sa(host, port);
+    std::vector<std::string> addresses = expandInterfaces(opts.listenInterfaces);
+    if (addresses.empty()) {
+        // We specified some interfaces, but couldn't find addresses for them
+        QPID_LOG(warning, "TCP/TCP6: No specified network interfaces found: Not Listening");
+        listeningPort = 0;
+    }
+
+    for (unsigned i = 0; i<addresses.size(); ++i) {
+        QPID_LOG(debug, "Using interface: " << addresses[i]);
+        SocketAddress sa(addresses[i], boost::lexical_cast<std::string>(options.port));
 
-    // We must have at least one resolved address
-    QPID_LOG(info, "SSL Listening to: " << sa.asString())
-    Socket* s = new Socket;
-    listeningPort = s->listen(sa, backlog);
-    listeners.push_back(s);
 
-    // Try any other resolved addresses
-    while (sa.nextAddress()) {
+        // We must have at least one resolved address
         QPID_LOG(info, "SSL Listening to: " << sa.asString())
-        Socket* s = new Socket;
-        s->listen(sa, backlog);
+        Socket* s = createSocket();
+        listeningPort = s->listen(sa, opts.connectionBacklog);
         listeners.push_back(s);
+
+        // Try any other resolved addresses
+        while (sa.nextAddress()) {
+            QPID_LOG(info, "SSL Listening to: " << sa.asString())
+            Socket* s = createSocket();
+            s->listen(sa, opts.connectionBacklog);
+            listeners.push_back(s);
+        }
     }
 }
 
@@ -245,7 +273,7 @@ void SslProtocolFactory::established(sys
                                      const qpid::sys::Socket& s,
                                      sys::ConnectionCodec::Factory* f,
                                      bool isClient) {
-    sys::AsynchIOHandler* async = new sys::AsynchIOHandler(s.getFullAddress(), f);
+    sys::AsynchIOHandler* async = new sys::AsynchIOHandler(s.getFullAddress(), f, false);
 
     if (tcpNoDelay) {
         s.setTcpNoDelay();
@@ -325,7 +353,7 @@ void SslProtocolFactory::connect(sys::Po
     // upon connection failure or by the AsynchIO upon connection
     // shutdown.  The allocated AsynchConnector frees itself when it
     // is no longer needed.
-    qpid::sys::Socket* socket = new qpid::sys::Socket();
+    qpid::sys::Socket* socket = createSocket();
     connectFailedCallback = failed;
     AsynchConnector::create(*socket,
                             host,

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/SslConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/SslConnector.cpp?rev=1411155&r1=1411154&r2=1411155&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/SslConnector.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/SslConnector.cpp Mon Nov 19 12:47:53 2012
@@ -30,8 +30,9 @@
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/framing/InitiationHandler.h"
 #include "qpid/sys/ssl/util.h"
-#include "qpid/sys/ssl/SslIo.h"
+#include "qpid/sys/AsynchIO.h"
 #include "qpid/sys/ssl/SslSocket.h"
+#include "qpid/sys/SocketAddress.h"
 #include "qpid/sys/Dispatcher.h"
 #include "qpid/sys/Poller.h"
 #include "qpid/sys/SecuritySettings.h"
@@ -72,20 +73,23 @@ class SslConnector : public Connector
 
     sys::ssl::SslSocket socket;
 
-    sys::ssl::SslIO* aio;
+    sys::AsynchConnector* connector;
+    sys::AsynchIO* aio;
     std::string identifier;
     Poller::shared_ptr poller;
     SecuritySettings securitySettings;
 
     ~SslConnector();
 
-    void readbuff(qpid::sys::ssl::SslIO&, qpid::sys::ssl::SslIOBufferBase*);
-    void writebuff(qpid::sys::ssl::SslIO&);
+    void readbuff(AsynchIO&, AsynchIOBufferBase*);
+    void writebuff(AsynchIO&);
     void writeDataBlock(const framing::AMQDataBlock& data);
-    void eof(qpid::sys::ssl::SslIO&);
-    void disconnected(qpid::sys::ssl::SslIO&);
+    void eof(AsynchIO&);
+    void disconnected(AsynchIO&);
 
     void connect(const std::string& host, const std::string& port);
+    void connected(const sys::Socket&);
+    void connectFailed(const std::string& msg);
     void close();
     void send(framing::AMQFrame& frame);
     void abort() {} // TODO: Need to fix for heartbeat timeouts to work
@@ -96,7 +100,7 @@ class SslConnector : public Connector
     framing::OutputHandler* getOutputHandler();
     const std::string& getIdentifier() const;
     const SecuritySettings* getSecuritySettings();
-    void socketClosed(qpid::sys::ssl::SslIO&, const qpid::sys::ssl::SslSocket&);
+    void socketClosed(AsynchIO&, const Socket&);
 
     size_t decode(const char* buffer, size_t size);
     size_t encode(char* buffer, size_t size);
@@ -164,24 +168,28 @@ SslConnector::~SslConnector() {
     close();
 }
 
-void SslConnector::connect(const std::string& host, const std::string& port){
+void SslConnector::connect(const std::string& host, const std::string& port) {
     Mutex::ScopedLock l(lock);
     assert(closed);
-    try {
-        socket.connect(host, port);
-    } catch (const std::exception& e) {
-        socket.close();
-        throw TransportFailure(e.what());
-    }
-
+    connector = AsynchConnector::create(
+        socket,
+        host, port,
+        boost::bind(&SslConnector::connected, this, _1),
+        boost::bind(&SslConnector::connectFailed, this, _3));
     closed = false;
-    aio = new SslIO(socket,
-                       boost::bind(&SslConnector::readbuff, this, _1, _2),
-                       boost::bind(&SslConnector::eof, this, _1),
-                       boost::bind(&SslConnector::disconnected, this, _1),
-                       boost::bind(&SslConnector::socketClosed, this, _1, _2),
-                       0, // nobuffs
-                       boost::bind(&SslConnector::writebuff, this, _1));
+
+    connector->start(poller);
+}
+
+void SslConnector::connected(const Socket&) {
+    connector = 0;
+    aio = AsynchIO::create(socket,
+                           boost::bind(&SslConnector::readbuff, this, _1, _2),
+                           boost::bind(&SslConnector::eof, this, _1),
+                           boost::bind(&SslConnector::disconnected, this, _1),
+                           boost::bind(&SslConnector::socketClosed, this, _1, _2),
+                           0, // nobuffs
+                           boost::bind(&SslConnector::writebuff, this, _1));
 
     aio->createBuffers(maxFrameSize);
     identifier = str(format("[%1%]") % socket.getFullAddress());
@@ -190,6 +198,16 @@ void SslConnector::connect(const std::st
     aio->start(poller);
 }
 
+void SslConnector::connectFailed(const std::string& msg) {
+    connector = 0;
+    QPID_LOG(warning, "Connect failed: " << msg);
+    socket.close();
+    if (!closed)
+        closed = true;
+    if (shutdownHandler)
+        shutdownHandler->shutdown();
+}
+
 void SslConnector::close() {
     Mutex::ScopedLock l(lock);
     if (!closed) {
@@ -199,7 +217,7 @@ void SslConnector::close() {
     }
 }
 
-void SslConnector::socketClosed(SslIO&, const SslSocket&) {
+void SslConnector::socketClosed(AsynchIO&, const Socket&) {
     if (aio)
         aio->queueForDeletion();
     if (shutdownHandler)
@@ -255,7 +273,7 @@ void SslConnector::send(AMQFrame& frame)
     }
 }
 
-void SslConnector::writebuff(SslIO& /*aio*/)
+void SslConnector::writebuff(AsynchIO& /*aio*/)
 {
     // It's possible to be disconnected and be writable
     if (closed)
@@ -265,7 +283,7 @@ void SslConnector::writebuff(SslIO& /*ai
         return;
     }
 
-    SslIO::BufferBase* buffer = aio->getQueuedBuffer();
+    AsynchIOBufferBase* buffer = aio->getQueuedBuffer();
     if (buffer) {
 
         size_t encoded = encode(buffer->bytes, buffer->byteCount);
@@ -304,7 +322,7 @@ size_t SslConnector::encode(char* buffer
     return bytesWritten;
 }
 
-void SslConnector::readbuff(SslIO& aio, SslIO::BufferBase* buff)
+void SslConnector::readbuff(AsynchIO& aio, AsynchIOBufferBase* buff)
 {
     int32_t decoded = decode(buff->bytes+buff->dataStart, buff->dataCount);
     // TODO: unreading needs to go away, and when we can cope
@@ -343,7 +361,7 @@ size_t SslConnector::decode(const char* 
 }
 
 void SslConnector::writeDataBlock(const AMQDataBlock& data) {
-    SslIO::BufferBase* buff = aio->getQueuedBuffer();
+    AsynchIOBufferBase* buff = aio->getQueuedBuffer();
     assert(buff);
     framing::Buffer out(buff->bytes, buff->byteCount);
     data.encode(out);
@@ -351,11 +369,11 @@ void SslConnector::writeDataBlock(const 
     aio->queueWrite(buff);
 }
 
-void SslConnector::eof(SslIO&) {
+void SslConnector::eof(AsynchIO&) {
     close();
 }
 
-void SslConnector::disconnected(SslIO&) {
+void SslConnector::disconnected(AsynchIO&) {
     close();
     socketClosed(*aio, socket);
 }

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/TCPConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/TCPConnector.cpp?rev=1411155&r1=1411154&r2=1411155&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/TCPConnector.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/TCPConnector.cpp Mon Nov 19 12:47:53 2012
@@ -72,12 +72,13 @@ TCPConnector::TCPConnector(Poller::share
       closed(true),
       shutdownHandler(0),
       input(0),
+      socket(createSocket()),
       connector(0),
       aio(0),
       poller(p)
 {
     QPID_LOG(debug, "TCPConnector created for " << version);
-    settings.configureSocket(socket);
+    settings.configureSocket(*socket);
 }
 
 TCPConnector::~TCPConnector() {
@@ -88,7 +89,7 @@ void TCPConnector::connect(const std::st
     Mutex::ScopedLock l(lock);
     assert(closed);
     connector = AsynchConnector::create(
-        socket,
+        *socket,
         host, port,
         boost::bind(&TCPConnector::connected, this, _1),
         boost::bind(&TCPConnector::connectFailed, this, _3));
@@ -99,7 +100,7 @@ void TCPConnector::connect(const std::st
 
 void TCPConnector::connected(const Socket&) {
     connector = 0;
-    aio = AsynchIO::create(socket,
+    aio = AsynchIO::create(*socket,
                        boost::bind(&TCPConnector::readbuff, this, _1, _2),
                        boost::bind(&TCPConnector::eof, this, _1),
                        boost::bind(&TCPConnector::disconnected, this, _1),
@@ -116,7 +117,7 @@ void TCPConnector::start(sys::AsynchIO* 
 
     aio->createBuffers(maxFrameSize);
 
-    identifier = str(format("[%1%]") % socket.getFullAddress());
+    identifier = str(format("[%1%]") % socket->getFullAddress());
 }
 
 void TCPConnector::initAmqp() {
@@ -127,7 +128,7 @@ void TCPConnector::initAmqp() {
 void TCPConnector::connectFailed(const std::string& msg) {
     connector = 0;
     QPID_LOG(warning, "Connect failed: " << msg);
-    socket.close();
+    socket->close();
     if (!closed)
         closed = true;
     if (shutdownHandler)
@@ -318,7 +319,7 @@ void TCPConnector::eof(AsynchIO&) {
 
 void TCPConnector::disconnected(AsynchIO&) {
     close();
-    socketClosed(*aio, socket);
+    socketClosed(*aio, *socket);
 }
 
 void TCPConnector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer> sl)

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/TCPConnector.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/TCPConnector.h?rev=1411155&r1=1411154&r2=1411155&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/TCPConnector.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/TCPConnector.h Mon Nov 19 12:47:53 2012
@@ -35,7 +35,7 @@
 #include "qpid/sys/Thread.h"
 
 #include <boost/shared_ptr.hpp>
-#include <boost/weak_ptr.hpp>
+#include <boost/scoped_ptr.hpp>
 #include <deque>
 #include <string>
 
@@ -66,7 +66,7 @@ class TCPConnector : public Connector, p
     sys::ShutdownHandler* shutdownHandler;
     framing::InputHandler* input;
 
-    sys::Socket socket;
+    boost::scoped_ptr<sys::Socket> socket;
 
     sys::AsynchConnector* connector;
     sys::AsynchIO* aio;

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/framing/FieldValue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/framing/FieldValue.cpp?rev=1411155&r1=1411154&r2=1411155&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/framing/FieldValue.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/framing/FieldValue.cpp Mon Nov 19 12:47:53 2012
@@ -23,6 +23,7 @@
 #include "qpid/framing/Buffer.h"
 #include "qpid/framing/Endian.h"
 #include "qpid/framing/List.h"
+#include "qpid/framing/Uuid.h"
 #include "qpid/framing/reply_exceptions.h"
 #include "qpid/Msg.h"
 
@@ -43,7 +44,9 @@ void FieldValue::setType(uint8_t type)
         data.reset(new EncodedValue<List>());
     } else if (typeOctet == 0xAA) {
         data.reset(new EncodedValue<Array>());
-    } else {    
+    } else if (typeOctet == 0x48) {
+        data.reset(new UuidData());
+    } else {
         uint8_t lenType = typeOctet >> 4;
         switch(lenType){
           case 0:
@@ -213,9 +216,12 @@ Integer8Value::Integer8Value(int8_t v) :
 Integer16Value::Integer16Value(int16_t v) :
     FieldValue(0x11, new FixedWidthValue<2>(v))
 {}
-UuidValue::UuidValue(const unsigned char* v) :
-    FieldValue(0x48, new FixedWidthValue<16>(v))
-{}
+
+UuidData::UuidData() {}
+UuidData::UuidData(const unsigned char* bytes) : FixedWidthValue<16>(bytes) {}
+bool UuidData::convertsToString() const { return true; }
+std::string UuidData::getString() const { return Uuid(rawOctets()).str(); }
+UuidValue::UuidValue(const unsigned char* v) : FieldValue(0x48, new UuidData(v)) {}
 
 void FieldValue::print(std::ostream& out) const {
     data->print(out);



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org