You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2012/11/19 13:48:08 UTC
svn commit: r1411155 [2/6] - in
/qpid/branches/java-broker-config-qpid-4390/qpid: ./ cpp/ cpp/bindings/
cpp/bindings/qpid/dotnet/
cpp/bindings/qpid/dotnet/examples/csharp.map.callback.receiver/
cpp/bindings/qpid/dotnet/msvc9/ cpp/bindings/qpid/dotnet/s...
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Bridge.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Bridge.cpp?rev=1411155&r1=1411154&r2=1411155&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Bridge.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Bridge.cpp Mon Nov 19 12:47:53 2012
@@ -121,28 +121,30 @@ void Bridge::create(Connection& c)
peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
QPID_LOG(debug, "Activated bridge " << name << " for route from queue " << args.i_src << " to " << args.i_dest);
} else {
- FieldTable queueSettings;
+ if (!useExistingQueue) {
+ FieldTable queueSettings;
- if (args.i_tag.size()) {
- queueSettings.setString("qpid.trace.id", args.i_tag);
- } else {
- const string& peerTag = c.getFederationPeerTag();
- if (peerTag.size())
- queueSettings.setString("qpid.trace.id", peerTag);
+ if (args.i_tag.size()) {
+ queueSettings.setString("qpid.trace.id", args.i_tag);
+ } else {
+ const string& peerTag = c.getFederationPeerTag();
+ if (peerTag.size())
+ queueSettings.setString("qpid.trace.id", peerTag);
+ }
+
+ if (args.i_excludes.size()) {
+ queueSettings.setString("qpid.trace.exclude", args.i_excludes);
+ } else {
+ const string& localTag = link->getBroker()->getFederationTag();
+ if (localTag.size())
+ queueSettings.setString("qpid.trace.exclude", localTag);
+ }
+
+ bool durable = false;//should this be an arg, or would we use srcIsQueue for durable queues?
+ bool exclusive = true; // only exclusive if the queue is owned by the bridge
+ bool autoDelete = exclusive && !durable;//auto delete transient queues?
+ peer->getQueue().declare(queueName, altEx, false, durable, exclusive, autoDelete, queueSettings);
}
-
- if (args.i_excludes.size()) {
- queueSettings.setString("qpid.trace.exclude", args.i_excludes);
- } else {
- const string& localTag = link->getBroker()->getFederationTag();
- if (localTag.size())
- queueSettings.setString("qpid.trace.exclude", localTag);
- }
-
- bool durable = false;//should this be an arg, or would we use srcIsQueue for durable queues?
- bool exclusive = !useExistingQueue; // only exclusive if the queue is owned by the bridge
- bool autoDelete = exclusive && !durable;//auto delete transient queues?
- peer->getQueue().declare(queueName, altEx, false, durable, exclusive, autoDelete, queueSettings);
if (!args.i_dynamic)
peer->getExchange().bind(queueName, args.i_src, args.i_key, FieldTable());
peer->getMessage().subscribe(queueName, args.i_dest, (useExistingQueue && args.i_sync) ? 0 : 1, 0, false, "", 0, options);
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Bridge.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Bridge.h?rev=1411155&r1=1411154&r2=1411155&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Bridge.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Bridge.h Mon Nov 19 12:47:53 2012
@@ -65,6 +65,7 @@ class Bridge : public PersistableConfig,
QPID_BROKER_EXTERN void close();
bool isDurable() { return args.i_durable; }
+ framing::ChannelId getChannel() const { return channel; }
Link *getLink() const { return link; }
const std::string getSrc() const { return args.i_src; }
const std::string getDest() const { return args.i_dest; }
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1411155&r1=1411154&r2=1411155&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Broker.cpp Mon Nov 19 12:47:53 2012
@@ -48,8 +48,6 @@
#include "qmf/org/apache/qpid/broker/ArgsBrokerGetTimestampConfig.h"
#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
-#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h"
-#include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
#include "qmf/org/apache/qpid/broker/EventBind.h"
#include "qmf/org/apache/qpid/broker/EventUnbind.h"
#include "qpid/amqp_0_10/Codecs.h"
@@ -146,6 +144,7 @@ Broker::Options::Options(const std::stri
("data-dir", optValue(dataDir,"DIR"), "Directory to contain persistent data generated by the broker")
("no-data-dir", optValue(noDataDir), "Don't use a data directory. No persistent configuration will be loaded or stored")
("port,p", optValue(port,"PORT"), "Tells the broker to listen on PORT")
+ ("interface", optValue(listenInterfaces, "<interface name>|<interface address>"), "Which network interfaces to use to listen for incoming connections")
("worker-threads", optValue(workerThreads, "N"), "Sets the broker thread pool size")
("connection-backlog", optValue(connectionBacklog, "N"), "Sets the connection backlog limit for the server socket")
("mgmt-enable,m", optValue(enableMgmt,"yes|no"), "Enable Management")
@@ -1112,21 +1111,12 @@ std::pair<boost::shared_ptr<Queue>, bool
if (!alternate) throw framing::NotFoundException(QPID_MSG("Alternate exchange does not exist: " << alternateExchange));
}
- std::pair<Queue::shared_ptr, bool> result = queues.declare(name, settings, alternate);
+ std::pair<Queue::shared_ptr, bool> result =
+ queues.declare(name, settings, alternate, false/*recovering*/,
+ owner, connectionId, userId);
if (result.second) {
//add default binding:
result.first->bind(exchanges.getDefault(), name);
-
- if (managementAgent.get()) {
- //TODO: debatable whether we should raise an event here for
- //create when this is a 'declare' event; ideally add a create
- //event instead?
- managementAgent->raiseEvent(
- _qmf::EventQueueDeclare(connectionId, userId, name,
- settings.durable, owner, settings.autodelete, alternateExchange,
- settings.asMap(),
- "created"));
- }
QPID_LOG_CAT(debug, model, "Create queue. name:" << name
<< " user:" << userId
<< " rhost:" << connectionId
@@ -1150,17 +1140,14 @@ void Broker::deleteQueue(const std::stri
if (check) check(queue);
if (acl)
acl->recordDestroyQueue(name);
- queues.destroy(name);
+ queues.destroy(name, connectionId, userId);
queue->destroyed();
} else {
throw framing::NotFoundException(QPID_MSG("Delete failed. No such queue: " << name));
}
-
- if (managementAgent.get())
- managementAgent->raiseEvent(_qmf::EventQueueDelete(connectionId, userId, name));
QPID_LOG_CAT(debug, model, "Delete queue. name:" << name
- << " user:" << userId
- << " rhost:" << connectionId
+ << " user:" << userId
+ << " rhost:" << connectionId
);
}
@@ -1190,29 +1177,12 @@ std::pair<Exchange::shared_ptr, bool> Br
}
std::pair<Exchange::shared_ptr, bool> result;
- result = exchanges.declare(name, type, durable, arguments);
+ result = exchanges.declare(
+ name, type, durable, arguments, alternate, connectionId, userId);
if (result.second) {
- if (alternate) {
- result.first->setAlternate(alternate);
- alternate->incAlternateUsers();
- }
if (durable) {
store->create(*result.first, arguments);
}
- if (managementAgent.get()) {
- //TODO: debatable whether we should raise an event here for
- //create when this is a 'declare' event; ideally add a create
- //event instead?
- managementAgent->raiseEvent(_qmf::EventExchangeDeclare(connectionId,
- userId,
- name,
- type,
- alternateExchange,
- durable,
- false,
- ManagementAgent::toMap(arguments),
- "created"));
- }
QPID_LOG_CAT(debug, model, "Create exchange. name:" << name
<< " user:" << userId
<< " rhost:" << connectionId
@@ -1239,10 +1209,7 @@ void Broker::deleteExchange(const std::s
if (exchange->inUseAsAlternate()) throw framing::NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange."));
if (exchange->isDurable()) store->destroy(*exchange);
if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers();
- exchanges.destroy(name);
-
- if (managementAgent.get())
- managementAgent->raiseEvent(_qmf::EventExchangeDelete(connectionId, userId, name));
+ exchanges.destroy(name, connectionId, userId);
QPID_LOG_CAT(debug, model, "Delete exchange. name:" << name
<< " user:" << userId
<< " rhost:" << connectionId);
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Broker.h?rev=1411155&r1=1411154&r2=1411155&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Broker.h Mon Nov 19 12:47:53 2012
@@ -103,6 +103,7 @@ class Broker : public sys::Runnable, pub
bool noDataDir;
std::string dataDir;
uint16_t port;
+ std::vector<std::string> listenInterfaces;
int workerThreads;
int connectionBacklog;
bool enableMgmt;
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/ConfigurationObserver.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/ConfigurationObserver.h?rev=1411155&r1=1411154&r2=1411155&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/ConfigurationObserver.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/ConfigurationObserver.h Mon Nov 19 12:47:53 2012
@@ -38,6 +38,10 @@ class Exchange;
/**
* Observer for changes to configuration (aka wiring)
+ *
+ * NOTE: create and destroy functions are called with
+ * the registry lock held. This is necessary to ensure
+ * they are called in the correct sequence.
*/
class ConfigurationObserver
{
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Exchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=1411155&r1=1411154&r2=1411155&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Exchange.cpp Mon Nov 19 12:47:53 2012
@@ -408,5 +408,10 @@ bool Exchange::routeWithAlternate(Delive
return msg.delivered;
}
+void Exchange::setArgs(const framing::FieldTable& newArgs) {
+ args = newArgs;
+ if (mgmtExchange) mgmtExchange->set_arguments(ManagementAgent::toMap(args));
+}
+
}}
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Exchange.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Exchange.h?rev=1411155&r1=1411154&r2=1411155&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Exchange.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Exchange.h Mon Nov 19 12:47:53 2012
@@ -173,8 +173,8 @@ public:
const std::string& getName() const { return name; }
bool isDurable() { return durable; }
- qpid::framing::FieldTable& getArgs() { return args; }
- const qpid::framing::FieldTable& getArgs() const { return args; }
+ QPID_BROKER_EXTERN const qpid::framing::FieldTable& getArgs() const { return args; }
+ QPID_BROKER_EXTERN void setArgs(const framing::FieldTable&);
QPID_BROKER_EXTERN Exchange::shared_ptr getAlternate() { return alternate; }
QPID_BROKER_EXTERN void setAlternate(Exchange::shared_ptr _alternate);
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp?rev=1411155&r1=1411154&r2=1411155&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp Mon Nov 19 12:47:53 2012
@@ -29,20 +29,26 @@
#include "qpid/management/ManagementDirectExchange.h"
#include "qpid/management/ManagementTopicExchange.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
+#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
using namespace qpid::broker;
using namespace qpid::sys;
using std::pair;
using std::string;
using qpid::framing::FieldTable;
+using qpid::management::ManagementAgent;
+namespace _qmf = qmf::org::apache::qpid::broker;
pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type){
return declare(name, type, false, FieldTable());
}
-pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type,
- bool durable, const FieldTable& args){
+pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(
+ const string& name, const string& type, bool durable, const FieldTable& args,
+ Exchange::shared_ptr alternate, const string& connectionId, const string& userId)
+{
Exchange::shared_ptr exchange;
std::pair<Exchange::shared_ptr, bool> result;
{
@@ -73,31 +79,58 @@ pair<Exchange::shared_ptr, bool> Exchang
}
exchanges[name] = exchange;
result = std::pair<Exchange::shared_ptr, bool>(exchange, true);
+ if (alternate) {
+ exchange->setAlternate(alternate);
+ alternate->incAlternateUsers();
+ }
+ // Call exchangeCreate inside the lock to ensure correct ordering.
+ if (broker) broker->getConfigurationObservers().exchangeCreate(exchange);
} else {
result = std::pair<Exchange::shared_ptr, bool>(i->second, false);
}
+ if (broker && broker->getManagementAgent()) {
+ // Call raiseEvent inside the lock to ensure correct ordering.
+ broker->getManagementAgent()->raiseEvent(
+ _qmf::EventExchangeDeclare(
+ connectionId,
+ userId,
+ name,
+ type,
+ alternate ? alternate->getName() : string(),
+ durable,
+ false,
+ ManagementAgent::toMap(result.first->getArgs()),
+ "created"));
+ }
}
- if (broker && exchange) broker->getConfigurationObservers().exchangeCreate(exchange);
return result;
}
-void ExchangeRegistry::destroy(const string& name){
+void ExchangeRegistry::destroy(
+ const string& name, const string& connectionId, const string& userId)
+{
if (name.empty() ||
(name.find("amq.") == 0 &&
(name == "amq.direct" || name == "amq.fanout" || name == "amq.topic" || name == "amq.match")) ||
name == "qpid.management")
throw framing::NotAllowedException(QPID_MSG("Cannot delete default exchange: '" << name << "'"));
- Exchange::shared_ptr exchange;
{
RWlock::ScopedWlock locker(lock);
ExchangeMap::iterator i = exchanges.find(name);
if (i != exchanges.end()) {
- exchange = i->second;
+ if (broker) {
+ // Call exchangeDestroy and raiseEvent inside the lock to ensure
+ // correct ordering.
+ broker->getConfigurationObservers().exchangeDestroy(i->second);
+ if (broker->getManagementAgent())
+ broker->getManagementAgent()->raiseEvent(
+ _qmf::EventExchangeDelete(connectionId, userId, name));
+ }
i->second->destroy();
exchanges.erase(i);
+
}
}
- if (broker && exchange) broker->getConfigurationObservers().exchangeDestroy(exchange);
}
Exchange::shared_ptr ExchangeRegistry::find(const string& name){
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/ExchangeRegistry.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/ExchangeRegistry.h?rev=1411155&r1=1411154&r2=1411155&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/ExchangeRegistry.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/ExchangeRegistry.h Mon Nov 19 12:47:53 2012
@@ -46,14 +46,23 @@ class ExchangeRegistry{
bool, const qpid::framing::FieldTable&, qpid::management::Manageable*, qpid::broker::Broker*> FactoryFunction;
ExchangeRegistry (Broker* b = 0) : parent(0), broker(b) {}
- QPID_BROKER_EXTERN std::pair<Exchange::shared_ptr, bool> declare
- (const std::string& name, const std::string& type);
- QPID_BROKER_EXTERN std::pair<Exchange::shared_ptr, bool> declare
- (const std::string& name,
- const std::string& type,
- bool durable,
- const qpid::framing::FieldTable& args = framing::FieldTable());
- QPID_BROKER_EXTERN void destroy(const std::string& name);
+ QPID_BROKER_EXTERN std::pair<Exchange::shared_ptr, bool> declare(
+ const std::string& name, const std::string& type);
+
+ QPID_BROKER_EXTERN std::pair<Exchange::shared_ptr, bool> declare(
+ const std::string& name,
+ const std::string& type,
+ bool durable,
+ const qpid::framing::FieldTable& args = framing::FieldTable(),
+ Exchange::shared_ptr alternate = Exchange::shared_ptr(),
+ const std::string& connectionId = std::string(),
+ const std::string& userId = std::string());
+
+ QPID_BROKER_EXTERN void destroy(
+ const std::string& name,
+ const std::string& connectionId = std::string(),
+ const std::string& userId = std::string());
+
QPID_BROKER_EXTERN Exchange::shared_ptr getDefault();
/**
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Link.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Link.cpp?rev=1411155&r1=1411154&r2=1411155&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Link.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Link.cpp Mon Nov 19 12:47:53 2012
@@ -30,6 +30,7 @@
#include "qpid/log/Statement.h"
#include "qpid/framing/enum.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/amqp_types.h"
#include "qpid/broker/AclModule.h"
#include "qpid/broker/Exchange.h"
#include "qpid/UrlArray.h"
@@ -148,7 +149,8 @@ Link::Link(const string& _name,
currentInterval(1),
closing(false),
reconnectNext(0), // Index of next address for reconnecting in url.
- channelCounter(1),
+ nextFreeChannel(1),
+ freeChannels(1, framing::CHANNEL_MAX),
connection(0),
agent(0),
listener(l),
@@ -542,12 +544,41 @@ bool Link::hideManagement() const {
return !mgmtObject || ( broker && broker->isInCluster());
}
-uint Link::nextChannel()
+// Allocate channel from link free pool
+framing::ChannelId Link::nextChannel()
{
Mutex::ScopedLock mutex(lock);
- if (channelCounter >= framing::CHANNEL_MAX)
- channelCounter = 1;
- return channelCounter++;
+ if (!freeChannels.empty()) {
+ // A free channel exists.
+ for (framing::ChannelId i = 1; i <= framing::CHANNEL_MAX; i++)
+ {
+ // extract proposed free channel
+ framing::ChannelId c = nextFreeChannel;
+ // calculate next free channel
+ if (framing::CHANNEL_MAX == nextFreeChannel)
+ nextFreeChannel = 1;
+ else
+ nextFreeChannel += 1;
+ // if proposed channel is free, use it
+ if (freeChannels.contains(c))
+ {
+ freeChannels -= c;
+ QPID_LOG(debug, "Link " << name << " allocates channel: " << c);
+ return c;
+ }
+ }
+ assert (false);
+ }
+
+ throw Exception(Msg() << "Link " << name << " channel pool is empty");
+}
+
+// Return channel to link free pool
+void Link::returnChannel(framing::ChannelId c)
+{
+ Mutex::ScopedLock mutex(lock);
+ QPID_LOG(debug, "Link " << name << " frees channel: " << c);
+ freeChannels += c;
}
void Link::notifyConnectionForced(const string text)
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Link.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Link.h?rev=1411155&r1=1411154&r2=1411155&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Link.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Link.h Mon Nov 19 12:47:53 2012
@@ -82,7 +82,8 @@ class Link : public PersistableConfig, p
Bridges created; // Bridges pending creation
Bridges active; // Bridges active
Bridges cancellations; // Bridges pending cancellation
- uint channelCounter;
+ framing::ChannelId nextFreeChannel;
+ RangeSet<framing::ChannelId> freeChannels;
Connection* connection;
management::ManagementAgent* agent;
boost::function<void(Link*)> listener;
@@ -151,7 +152,8 @@ class Link : public PersistableConfig, p
bool isDurable() { return durable; }
void maintenanceVisit ();
- uint nextChannel();
+ framing::ChannelId nextChannel(); // allocate channel from link free pool
+ void returnChannel(framing::ChannelId); // return channel to link free pool
void add(Bridge::shared_ptr);
void cancel(Bridge::shared_ptr);
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/LinkRegistry.cpp?rev=1411155&r1=1411154&r2=1411155&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/LinkRegistry.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/LinkRegistry.cpp Mon Nov 19 12:47:53 2012
@@ -254,6 +254,7 @@ void LinkRegistry::destroyBridge(Bridge
Link *link = b->second->getLink();
if (link) {
link->cancel(b->second);
+ link->returnChannel( bridge->getChannel() );
}
if (b->second->isDurable())
store->destroy(*(b->second));
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1411155&r1=1411154&r2=1411155&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Queue.cpp Mon Nov 19 12:47:53 2012
@@ -1169,14 +1169,10 @@ void tryAutoDeleteImpl(Broker& broker, Q
{
if (broker.getQueues().destroyIf(queue->getName(),
boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) {
- QPID_LOG(debug, "Auto-deleting " << queue->getName());
- queue->destroyed();
-
- if (broker.getManagementAgent())
- broker.getManagementAgent()->raiseEvent(_qmf::EventQueueDelete(connectionId, userId, queue->getName()));
- QPID_LOG_CAT(debug, model, "Delete queue. name:" << queue->getName()
+ QPID_LOG_CAT(debug, model, "Auto-delete queue: " << queue->getName()
<< " user:" << userId
<< " rhost:" << connectionId );
+ queue->destroyed();
}
}
@@ -1598,5 +1594,10 @@ void Queue::UsageBarrier::destroy()
while (count) usageLock.wait();
}
+void Queue::addArgument(const string& key, const types::Variant& value) {
+ settings.original.insert(types::Variant::Map::value_type(key, value));
+ if (mgmtObject != 0) mgmtObject->set_arguments(settings.asMap());
+}
+
}}
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Queue.h?rev=1411155&r1=1411154&r2=1411155&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Queue.h Mon Nov 19 12:47:53 2012
@@ -145,7 +145,7 @@ class Queue : public boost::enable_share
mutable qpid::sys::Mutex messageLock;
mutable qpid::sys::Mutex ownershipLock;
mutable uint64_t persistenceId;
- const QueueSettings settings;
+ QueueSettings settings;
qpid::framing::FieldTable encodableSettings;
QueueDepth current;
QueueBindings bindings;
@@ -423,6 +423,10 @@ class Queue : public boost::enable_share
uint32_t getDequeueSincePurge() { return dequeueSincePurge.get(); }
QPID_BROKER_EXTERN void setDequeueSincePurge(uint32_t value);
+
+ /** Add an argument to be included in management messages about this queue. */
+ QPID_BROKER_EXTERN void addArgument(const std::string& key, const types::Variant& value);
+
friend class QueueFactory;
};
}
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/QueueRegistry.cpp?rev=1411155&r1=1411154&r2=1411155&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/QueueRegistry.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/QueueRegistry.cpp Mon Nov 19 12:47:53 2012
@@ -23,10 +23,14 @@
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/Exchange.h"
#include "qpid/log/Statement.h"
+#include "qpid/management/ManagementAgent.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h"
+#include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
#include <sstream>
#include <assert.h>
+namespace _qmf = qmf::org::apache::qpid::broker;
using namespace qpid::broker;
using namespace qpid::sys;
using std::string;
@@ -44,7 +48,10 @@ QueueRegistry::declare(const string& nam
bool recovering/*true if this declare is a
result of recovering queue
definition from persistent
- record*/)
+ record*/,
+ const OwnershipToken* owner,
+ std::string connectionId,
+ std::string userId)
{
std::pair<Queue::shared_ptr, bool> result;
{
@@ -62,16 +69,31 @@ QueueRegistry::declare(const string& nam
queue->create();
}
queues[name] = queue;
+ // NOTE: raiseEvent and queueCreate must be called with the lock held in
+ // order to ensure events are generated in the correct order.
+ // Call queueCreate before raiseEvents so it can add arguments that
+ // will be included in the management event.
+ if (getBroker()) getBroker()->getConfigurationObservers().queueCreate(queue);
result = std::pair<Queue::shared_ptr, bool>(queue, true);
} else {
result = std::pair<Queue::shared_ptr, bool>(i->second, false);
}
+ if (getBroker() && getBroker()->getManagementAgent()) {
+ getBroker()->getManagementAgent()->raiseEvent(
+ _qmf::EventQueueDeclare(
+ connectionId, userId, name,
+ settings.durable, owner, settings.autodelete,
+ alternate ? alternate->getName() : string(),
+ result.first->getSettings().asMap(),
+ result.second ? "created" : "existing"));
+ }
}
- if (getBroker() && result.second) getBroker()->getConfigurationObservers().queueCreate(result.first);
return result;
}
-void QueueRegistry::destroy(const string& name) {
+void QueueRegistry::destroy(
+ const string& name, const string& connectionId, const string& userId)
+{
Queue::shared_ptr q;
{
qpid::sys::RWlock::ScopedWlock locker(lock);
@@ -79,9 +101,17 @@ void QueueRegistry::destroy(const string
if (i != queues.end()) {
q = i->second;
queues.erase(i);
+ if (getBroker()) {
+ // NOTE: queueDestroy and raiseEvent must be called with the
+ // lock held in order to ensure events are generated
+ // in the correct order.
+ getBroker()->getConfigurationObservers().queueDestroy(q);
+ if (getBroker()->getManagementAgent())
+ getBroker()->getManagementAgent()->raiseEvent(
+ _qmf::EventQueueDelete(connectionId, userId, name));
+ }
}
}
- if (getBroker() && q) getBroker()->getConfigurationObservers().queueDestroy(q);
}
Queue::shared_ptr QueueRegistry::find(const string& name){
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/QueueRegistry.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/QueueRegistry.h?rev=1411155&r1=1411154&r2=1411155&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/QueueRegistry.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/QueueRegistry.h Mon Nov 19 12:47:53 2012
@@ -59,7 +59,9 @@ class QueueRegistry : QueueFactory {
const std::string& name,
const QueueSettings& settings,
boost::shared_ptr<Exchange> alternateExchange = boost::shared_ptr<Exchange>(),
- bool recovering = false);
+ bool recovering = false,
+ const OwnershipToken* owner = 0,
+ std::string connectionId=std::string(), std::string userId=std::string());
/**
* Destroy the named queue.
@@ -73,7 +75,11 @@ class QueueRegistry : QueueFactory {
* subsequent calls to find or declare with the same name.
*
*/
- QPID_BROKER_EXTERN void destroy(const std::string& name);
+ QPID_BROKER_EXTERN void destroy(
+ const std::string& name,
+ const std::string& connectionId=std::string(),
+ const std::string& userId=std::string());
+
template <class Test> bool destroyIf(const std::string& name, Test test)
{
if (test()) {
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1411155&r1=1411154&r2=1411155&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SemanticState.cpp Mon Nov 19 12:47:53 2012
@@ -432,9 +432,9 @@ void SemanticState::cancel(ConsumerImpl:
Queue::shared_ptr queue = c->getQueue();
if(queue) {
queue->cancel(c);
- if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) {
+ // Only run auto-delete for counted consumers.
+ if (c->isCounted() && queue->canAutoDelete() && !queue->hasExclusiveOwner())
Queue::tryAutoDelete(session.getBroker(), queue, connectionId, userID);
- }
}
c->cancel();
}
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=1411155&r1=1411154&r2=1411155&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Mon Nov 19 12:47:53 2012
@@ -98,17 +98,6 @@ void SessionAdapter::ExchangeHandlerImpl
//exchange already there, not created
checkType(response.first, type);
checkAlternate(response.first, alternate);
- ManagementAgent* agent = getBroker().getManagementAgent();
- if (agent)
- agent->raiseEvent(_qmf::EventExchangeDeclare(getConnection().getUrl(),
- getConnection().getUserId(),
- exchange,
- type,
- alternateExchange,
- durable,
- false,
- ManagementAgent::toMap(args),
- "existing"));
QPID_LOG_CAT(debug, model, "Create exchange. name:" << exchange
<< " user:" << getConnection().getUserId()
<< " rhost:" << getConnection().getUrl()
@@ -318,11 +307,6 @@ void SessionAdapter::QueueHandlerImpl::d
if (exclusive && queue->setExclusiveOwner(&session)) {
exclusiveQueues.push_back(queue);
}
- ManagementAgent* agent = getBroker().getManagementAgent();
- if (agent)
- agent->raiseEvent(_qmf::EventQueueDeclare(getConnection().getUrl(), getConnection().getUserId(),
- name, durable, exclusive, autoDelete, alternateExchange, ManagementAgent::toMap(arguments),
- "existing"));
QPID_LOG_CAT(debug, model, "Create queue. name:" << name
<< " user:" << getConnection().getUserId()
<< " rhost:" << getConnection().getUrl()
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp?rev=1411155&r1=1411154&r2=1411155&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp Mon Nov 19 12:47:53 2012
@@ -22,6 +22,7 @@
#include "qpid/broker/amqp/Header.h"
#include "qpid/broker/amqp/Translation.h"
#include "qpid/broker/Queue.h"
+#include "qpid/broker/TopicKeyNode.h"
#include "qpid/sys/OutputControl.h"
#include "qpid/amqp/MessageEncoder.h"
#include "qpid/log/Statement.h"
@@ -163,6 +164,57 @@ bool Outgoing::accept(const qpid::broker
return canDeliver();
}
+void Outgoing::setSubjectFilter(const std::string& f)
+{
+ subjectFilter = f;
+}
+
+namespace {
+
+bool match(TokenIterator& filter, TokenIterator& target)
+{
+ bool wild = false;
+ while (!filter.finished())
+ {
+ if (filter.match1('*')) {
+ if (target.finished()) return false;
+ //else move to next word in filter target
+ filter.next();
+ target.next();
+ } else if (filter.match1('#')) {
+ // i.e. filter word is '#' which can match a variable number of words in the target
+ filter.next();
+ if (filter.finished()) return true;
+ else if (target.finished()) return false;
+ wild = true;
+ } else {
+ //filter word needs to match target exactly
+ if (target.finished()) return false;
+ std::string word;
+ target.pop(word);
+ if (filter.match(word)) {
+ wild = false;
+ filter.next();
+ } else if (!wild) {
+ return false;
+ }
+ }
+ }
+ return target.finished();
+}
+bool match(const std::string& filter, const std::string& target)
+{
+ TokenIterator lhs(filter);
+ TokenIterator rhs(target);
+ return match(lhs, rhs);
+}
+}
+
+bool Outgoing::filter(const qpid::broker::Message& m)
+{
+ return subjectFilter.empty() || subjectFilter == m.getRoutingKey() || match(subjectFilter, m.getRoutingKey());
+}
+
void Outgoing::cancel() {}
void Outgoing::acknowledged(const qpid::broker::DeliveryRecord&) {}
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Outgoing.h?rev=1411155&r1=1411154&r2=1411155&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Outgoing.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Outgoing.h Mon Nov 19 12:47:53 2012
@@ -60,6 +60,7 @@ class Outgoing : public qpid::broker::Co
{
public:
Outgoing(Broker&,boost::shared_ptr<Queue> q, pn_link_t* l, ManagedSession&, qpid::sys::OutputControl& o, bool topic);
+ void setSubjectFilter(const std::string&);
void init();
bool dispatch();
void write(const char* data, size_t size);
@@ -71,6 +72,7 @@ class Outgoing : public qpid::broker::Co
bool deliver(const QueueCursor& cursor, const qpid::broker::Message& msg);
void notify();
bool accept(const qpid::broker::Message&);
+ bool filter(const qpid::broker::Message&);
void cancel();
void acknowledged(const qpid::broker::DeliveryRecord&);
qpid::broker::OwnershipToken* getSession();
@@ -99,6 +101,7 @@ class Outgoing : public qpid::broker::Co
size_t current;
int outstanding;
std::vector<char> buffer;
+ std::string subjectFilter;
};
}}} // namespace qpid::broker::amqp
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Session.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Session.cpp?rev=1411155&r1=1411154&r2=1411155&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Session.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Session.cpp Mon Nov 19 12:47:53 2012
@@ -26,11 +26,16 @@
#include "qpid/broker/Broker.h"
#include "qpid/broker/DeliverableMessage.h"
#include "qpid/broker/Exchange.h"
+#include "qpid/broker/DirectExchange.h"
+#include "qpid/broker/TopicExchange.h"
#include "qpid/broker/FanOutExchange.h"
#include "qpid/broker/Message.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/TopicExchange.h"
+#include "qpid/broker/amqp/Filter.h"
+#include "qpid/broker/amqp/NodeProperties.h"
#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/FieldTable.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/log/Statement.h"
#include <boost/intrusive_ptr.hpp>
@@ -79,6 +84,31 @@ class Exchange : public Target
Session::Session(pn_session_t* s, qpid::broker::Broker& b, ManagedConnection& c, qpid::sys::OutputControl& o)
: ManagedSession(b, c, (boost::format("%1%") % s).str()), session(s), broker(b), connection(c), out(o), deleted(false) {}
+
+Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* terminus)
+{
+ ResolvedNode node;
+ node.exchange = broker.getExchanges().find(name);
+ node.queue = broker.getQueues().find(name);
+ if (!node.queue && !node.exchange && pn_terminus_is_dynamic(terminus)) {
+ //TODO: handle dynamic creation
+ //is it a queue or an exchange?
+ NodeProperties properties;
+ properties.read(pn_terminus_properties(terminus));
+ if (properties.isQueue()) {
+ node.queue = broker.createQueue(name, properties.getQueueSettings(), this, properties.getAlternateExchange(), connection.getUserid(), connection.getId()).first;
+ } else {
+ qpid::framing::FieldTable args;
+ node.exchange = broker.createExchange(name, properties.getExchangeType(), properties.isDurable(), properties.getAlternateExchange(),
+ args, connection.getUserid(), connection.getId()).first;
+ }
+ } else if (node.queue && node.exchange) {
+ QPID_LOG_CAT(warning, protocol, "Ambiguous node name; " << name << " could be queue or exchange, assuming queue");
+ node.exchange.reset();
+ }
+ return node;
+}
+
void Session::attach(pn_link_t* link)
{
if (pn_link_is_sender(link)) {
@@ -91,32 +121,36 @@ void Session::attach(pn_link_t* link)
QPID_LOG(debug, "Received attach request for outgoing link from " << name);
pn_terminus_set_address(pn_link_source(link), name.c_str());
- boost::shared_ptr<qpid::broker::Exchange> exchange = broker.getExchanges().find(name);
- boost::shared_ptr<qpid::broker::Queue> queue = broker.getQueues().find(name);
- if (queue) {
- if (exchange) {
- QPID_LOG_CAT(warning, protocol, "Ambiguous node name; " << name << " could be queue or exchange, assuming queue");
- }
- boost::shared_ptr<Outgoing> q(new Outgoing(broker, queue, link, *this, out, false));
+ ResolvedNode node = resolve(name, source);
+ Filter filter;
+ filter.read(pn_terminus_filter(source));
+
+ if (node.queue) {
+ boost::shared_ptr<Outgoing> q(new Outgoing(broker, node.queue, link, *this, out, false));
q->init();
+ if (filter.hasSubjectFilter()) {
+ q->setSubjectFilter(filter.getSubjectFilter());
+ }
senders[link] = q;
- } else if (exchange) {
+ } else if (node.exchange) {
QueueSettings settings(false, true);
//TODO: populate settings from source details when available from engine
- queue = broker.createQueue(name + qpid::types::Uuid(true).str(), settings, this, "", connection.getUserid(), connection.getId()).first;
- //TODO: bind based on filter when that is exposed by engine
- if (exchange->getType() == FanOutExchange::typeName) {
- exchange->bind(queue, std::string(), 0);
- } else if (exchange->getType() == TopicExchange::typeName) {
- exchange->bind(queue, "#", 0);
+ boost::shared_ptr<qpid::broker::Queue> queue
+ = broker.createQueue(name + qpid::types::Uuid(true).str(), settings, this, "", connection.getUserid(), connection.getId()).first;
+ if (filter.hasSubjectFilter()) {
+ filter.bind(node.exchange, queue);
+ filter.write(pn_terminus_filter(pn_link_source(link)));
+ } else if (node.exchange->getType() == FanOutExchange::typeName) {
+ node.exchange->bind(queue, std::string(), 0);
+ } else if (node.exchange->getType() == TopicExchange::typeName) {
+ node.exchange->bind(queue, "#", 0);
} else {
- throw qpid::Exception("Exchange type not yet supported over 1.0: " + exchange->getType());/*not-supported?*/
+ throw qpid::Exception("Exchange type requires a filter: " + node.exchange->getType());/*not-supported?*/
}
boost::shared_ptr<Outgoing> q(new Outgoing(broker, queue, link, *this, out, true));
senders[link] = q;
q->init();
} else {
- //TODO: handle dynamic creation
pn_terminus_set_type(pn_link_source(link), PN_UNSPECIFIED);
throw qpid::Exception("Node not found: " + name);/*not-found*/
}
@@ -130,22 +164,17 @@ void Session::attach(pn_link_t* link)
QPID_LOG(debug, "Received attach request for incoming link to " << name);
pn_terminus_set_address(pn_link_target(link), name.c_str());
+ ResolvedNode node = resolve(name, target);
- boost::shared_ptr<qpid::broker::Queue> queue = broker.getQueues().find(name);
- boost::shared_ptr<qpid::broker::Exchange> exchange = broker.getExchanges().find(name);
- if (queue) {
- if (exchange) {
- QPID_LOG_CAT(warning, protocol, "Ambiguous node name; " << name << " could be queue or exchange, assuming queue");
- }
- boost::shared_ptr<Target> q(new Queue(queue, link));
+ if (node.queue) {
+ boost::shared_ptr<Target> q(new Queue(node.queue, link));
targets[link] = q;
q->flow();
- } else if (exchange) {
- boost::shared_ptr<Target> e(new Exchange(exchange, link));
+ } else if (node.exchange) {
+ boost::shared_ptr<Target> e(new Exchange(node.exchange, link));
targets[link] = e;
e->flow();
} else {
- //TODO: handle dynamic creation
pn_terminus_set_type(pn_link_target(link), PN_UNSPECIFIED);
throw qpid::Exception("Node not found: " + name);/*not-found*/
}
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Session.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Session.h?rev=1411155&r1=1411154&r2=1411155&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Session.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Session.h Mon Nov 19 12:47:53 2012
@@ -32,11 +32,14 @@
struct pn_delivery_t;
struct pn_link_t;
struct pn_session_t;
+struct pn_terminus_t;
namespace qpid {
namespace broker {
class Broker;
+class Exchange;
+class Queue;
namespace amqp {
@@ -71,6 +74,13 @@ class Session : public ManagedSession, p
std::deque<pn_delivery_t*> completed;
bool deleted;
qpid::sys::Mutex lock;
+ struct ResolvedNode
+ {
+ boost::shared_ptr<qpid::broker::Exchange> exchange;
+ boost::shared_ptr<qpid::broker::Queue> queue;
+ };
+
+ ResolvedNode resolve(const std::string name, pn_terminus_t* terminus);
};
}}} // namespace qpid::broker::amqp
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp?rev=1411155&r1=1411154&r2=1411155&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp Mon Nov 19 12:47:53 2012
@@ -93,9 +93,7 @@ class SslProtocolFactory : public qpid::
CredHandle credHandle;
public:
- SslProtocolFactory(const SslServerOptions&, const std::string& host, const std::string& port,
- int backlog, bool nodelay,
- Timer& timer, uint32_t maxTime);
+ SslProtocolFactory(const qpid::broker::Broker::Options& opts, const SslServerOptions&, Timer& timer);
~SslProtocolFactory();
void accept(sys::Poller::shared_ptr, sys::ConnectionCodec::Factory*);
void connect(sys::Poller::shared_ptr, const std::string& host, const std::string& port,
@@ -129,10 +127,7 @@ static struct SslPlugin : public Plugin
if (broker) {
try {
const broker::Broker::Options& opts = broker->getOptions();
- ProtocolFactory::shared_ptr protocol(new SslProtocolFactory(options,
- "", boost::lexical_cast<std::string>(options.port),
- opts.connectionBacklog, opts.tcpNoDelay,
- broker->getTimer(), opts.maxNegotiateTime));
+ ProtocolFactory::shared_ptr protocol(new SslProtocolFactory(opts, options, broker->getTimer()));
QPID_LOG(notice, "Listening for SSL connections on TCP port " << protocol->getPort());
broker->registerProtocolFactory("ssl", protocol);
} catch (const std::exception& e) {
@@ -142,13 +137,36 @@ static struct SslPlugin : public Plugin
}
} sslPlugin;
-SslProtocolFactory::SslProtocolFactory(const SslServerOptions& options,
- const std::string& host, const std::string& port,
- int backlog, bool nodelay,
- Timer& timer, uint32_t maxTime)
+namespace {
+ // Expand list of Interfaces and addresses to a list of addresses
+ std::vector<std::string> expandInterfaces(const std::vector<std::string>& interfaces) {
+ std::vector<std::string> addresses;
+ // If there are no specific interfaces listed use a single "" to listen on every interface
+ if (interfaces.empty()) {
+ addresses.push_back("");
+ return addresses;
+ }
+ for (unsigned i = 0; i < interfaces.size(); ++i) {
+ const std::string& interface = interfaces[i];
+ if (!(SystemInfo::getInterfaceAddresses(interface, addresses))) {
+ // We don't have an interface of that name -
+ // Check for IPv6 ('[' ']') brackets and remove them
+ // then pass to be looked up directly
+ if (interface[0]=='[' && interface[interface.size()-1]==']') {
+ addresses.push_back(interface.substr(1, interface.size()-2));
+ } else {
+ addresses.push_back(interface);
+ }
+ }
+ }
+ return addresses;
+ }
+}
+
+SslProtocolFactory::SslProtocolFactory(const qpid::broker::Broker::Options& opts, const SslServerOptions& options, Timer& timer)
: brokerTimer(timer),
- maxNegotiateTime(maxTime),
- tcpNoDelay(nodelay),
+ maxNegotiateTime(opts.maxNegotiateTime),
+ tcpNoDelay(opts.tcpNoDelay),
clientAuthSelected(options.clientAuth) {
// Make sure that certificate store is good before listening to sockets
@@ -158,7 +176,7 @@ SslProtocolFactory::SslProtocolFactory(c
// Get the certificate for this server.
DWORD flags = 0;
std::string certStoreLocation = options.certStoreLocation;
- std::transform(certStoreLocation.begin(), certStoreLocation.end(), certStoreLocation.begin(), ::tolower);
+ std::transform(certStoreLocation.begin(), certStoreLocation.end(), certStoreLocation.begin(), ::tolower);
if (certStoreLocation == "currentuser") {
flags = CERT_SYSTEM_STORE_CURRENT_USER;
} else if (certStoreLocation == "localmachine") {
@@ -212,21 +230,31 @@ SslProtocolFactory::SslProtocolFactory(c
::CertFreeCertificateContext(certContext);
::CertCloseStore(certStoreHandle, 0);
- // Listen to socket(s)
- SocketAddress sa(host, port);
+ std::vector<std::string> addresses = expandInterfaces(opts.listenInterfaces);
+ if (addresses.empty()) {
+ // We specified some interfaces, but couldn't find addresses for them
+ QPID_LOG(warning, "TCP/TCP6: No specified network interfaces found: Not Listening");
+ listeningPort = 0;
+ }
+
+ for (unsigned i = 0; i<addresses.size(); ++i) {
+ QPID_LOG(debug, "Using interface: " << addresses[i]);
+ SocketAddress sa(addresses[i], boost::lexical_cast<std::string>(options.port));
- // We must have at least one resolved address
- QPID_LOG(info, "SSL Listening to: " << sa.asString())
- Socket* s = new Socket;
- listeningPort = s->listen(sa, backlog);
- listeners.push_back(s);
- // Try any other resolved addresses
- while (sa.nextAddress()) {
+ // We must have at least one resolved address
QPID_LOG(info, "SSL Listening to: " << sa.asString())
- Socket* s = new Socket;
- s->listen(sa, backlog);
+ Socket* s = createSocket();
+ listeningPort = s->listen(sa, opts.connectionBacklog);
listeners.push_back(s);
+
+ // Try any other resolved addresses
+ while (sa.nextAddress()) {
+ QPID_LOG(info, "SSL Listening to: " << sa.asString())
+ Socket* s = createSocket();
+ s->listen(sa, opts.connectionBacklog);
+ listeners.push_back(s);
+ }
}
}
@@ -245,7 +273,7 @@ void SslProtocolFactory::established(sys
const qpid::sys::Socket& s,
sys::ConnectionCodec::Factory* f,
bool isClient) {
- sys::AsynchIOHandler* async = new sys::AsynchIOHandler(s.getFullAddress(), f);
+ sys::AsynchIOHandler* async = new sys::AsynchIOHandler(s.getFullAddress(), f, false);
if (tcpNoDelay) {
s.setTcpNoDelay();
@@ -325,7 +353,7 @@ void SslProtocolFactory::connect(sys::Po
// upon connection failure or by the AsynchIO upon connection
// shutdown. The allocated AsynchConnector frees itself when it
// is no longer needed.
- qpid::sys::Socket* socket = new qpid::sys::Socket();
+ qpid::sys::Socket* socket = createSocket();
connectFailedCallback = failed;
AsynchConnector::create(*socket,
host,
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/SslConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/SslConnector.cpp?rev=1411155&r1=1411154&r2=1411155&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/SslConnector.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/SslConnector.cpp Mon Nov 19 12:47:53 2012
@@ -30,8 +30,9 @@
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/InitiationHandler.h"
#include "qpid/sys/ssl/util.h"
-#include "qpid/sys/ssl/SslIo.h"
+#include "qpid/sys/AsynchIO.h"
#include "qpid/sys/ssl/SslSocket.h"
+#include "qpid/sys/SocketAddress.h"
#include "qpid/sys/Dispatcher.h"
#include "qpid/sys/Poller.h"
#include "qpid/sys/SecuritySettings.h"
@@ -72,20 +73,23 @@ class SslConnector : public Connector
sys::ssl::SslSocket socket;
- sys::ssl::SslIO* aio;
+ sys::AsynchConnector* connector;
+ sys::AsynchIO* aio;
std::string identifier;
Poller::shared_ptr poller;
SecuritySettings securitySettings;
~SslConnector();
- void readbuff(qpid::sys::ssl::SslIO&, qpid::sys::ssl::SslIOBufferBase*);
- void writebuff(qpid::sys::ssl::SslIO&);
+ void readbuff(AsynchIO&, AsynchIOBufferBase*);
+ void writebuff(AsynchIO&);
void writeDataBlock(const framing::AMQDataBlock& data);
- void eof(qpid::sys::ssl::SslIO&);
- void disconnected(qpid::sys::ssl::SslIO&);
+ void eof(AsynchIO&);
+ void disconnected(AsynchIO&);
void connect(const std::string& host, const std::string& port);
+ void connected(const sys::Socket&);
+ void connectFailed(const std::string& msg);
void close();
void send(framing::AMQFrame& frame);
void abort() {} // TODO: Need to fix for heartbeat timeouts to work
@@ -96,7 +100,7 @@ class SslConnector : public Connector
framing::OutputHandler* getOutputHandler();
const std::string& getIdentifier() const;
const SecuritySettings* getSecuritySettings();
- void socketClosed(qpid::sys::ssl::SslIO&, const qpid::sys::ssl::SslSocket&);
+ void socketClosed(AsynchIO&, const Socket&);
size_t decode(const char* buffer, size_t size);
size_t encode(char* buffer, size_t size);
@@ -164,24 +168,28 @@ SslConnector::~SslConnector() {
close();
}
-void SslConnector::connect(const std::string& host, const std::string& port){
+void SslConnector::connect(const std::string& host, const std::string& port) {
Mutex::ScopedLock l(lock);
assert(closed);
- try {
- socket.connect(host, port);
- } catch (const std::exception& e) {
- socket.close();
- throw TransportFailure(e.what());
- }
-
+ connector = AsynchConnector::create(
+ socket,
+ host, port,
+ boost::bind(&SslConnector::connected, this, _1),
+ boost::bind(&SslConnector::connectFailed, this, _3));
closed = false;
- aio = new SslIO(socket,
- boost::bind(&SslConnector::readbuff, this, _1, _2),
- boost::bind(&SslConnector::eof, this, _1),
- boost::bind(&SslConnector::disconnected, this, _1),
- boost::bind(&SslConnector::socketClosed, this, _1, _2),
- 0, // nobuffs
- boost::bind(&SslConnector::writebuff, this, _1));
+
+ connector->start(poller);
+}
+
+void SslConnector::connected(const Socket&) {
+ connector = 0;
+ aio = AsynchIO::create(socket,
+ boost::bind(&SslConnector::readbuff, this, _1, _2),
+ boost::bind(&SslConnector::eof, this, _1),
+ boost::bind(&SslConnector::disconnected, this, _1),
+ boost::bind(&SslConnector::socketClosed, this, _1, _2),
+ 0, // nobuffs
+ boost::bind(&SslConnector::writebuff, this, _1));
aio->createBuffers(maxFrameSize);
identifier = str(format("[%1%]") % socket.getFullAddress());
@@ -190,6 +198,16 @@ void SslConnector::connect(const std::st
aio->start(poller);
}
+void SslConnector::connectFailed(const std::string& msg) {
+ connector = 0;
+ QPID_LOG(warning, "Connect failed: " << msg);
+ socket.close();
+ if (!closed)
+ closed = true;
+ if (shutdownHandler)
+ shutdownHandler->shutdown();
+}
+
void SslConnector::close() {
Mutex::ScopedLock l(lock);
if (!closed) {
@@ -199,7 +217,7 @@ void SslConnector::close() {
}
}
-void SslConnector::socketClosed(SslIO&, const SslSocket&) {
+void SslConnector::socketClosed(AsynchIO&, const Socket&) {
if (aio)
aio->queueForDeletion();
if (shutdownHandler)
@@ -255,7 +273,7 @@ void SslConnector::send(AMQFrame& frame)
}
}
-void SslConnector::writebuff(SslIO& /*aio*/)
+void SslConnector::writebuff(AsynchIO& /*aio*/)
{
// It's possible to be disconnected and be writable
if (closed)
@@ -265,7 +283,7 @@ void SslConnector::writebuff(SslIO& /*ai
return;
}
- SslIO::BufferBase* buffer = aio->getQueuedBuffer();
+ AsynchIOBufferBase* buffer = aio->getQueuedBuffer();
if (buffer) {
size_t encoded = encode(buffer->bytes, buffer->byteCount);
@@ -304,7 +322,7 @@ size_t SslConnector::encode(char* buffer
return bytesWritten;
}
-void SslConnector::readbuff(SslIO& aio, SslIO::BufferBase* buff)
+void SslConnector::readbuff(AsynchIO& aio, AsynchIOBufferBase* buff)
{
int32_t decoded = decode(buff->bytes+buff->dataStart, buff->dataCount);
// TODO: unreading needs to go away, and when we can cope
@@ -343,7 +361,7 @@ size_t SslConnector::decode(const char*
}
void SslConnector::writeDataBlock(const AMQDataBlock& data) {
- SslIO::BufferBase* buff = aio->getQueuedBuffer();
+ AsynchIOBufferBase* buff = aio->getQueuedBuffer();
assert(buff);
framing::Buffer out(buff->bytes, buff->byteCount);
data.encode(out);
@@ -351,11 +369,11 @@ void SslConnector::writeDataBlock(const
aio->queueWrite(buff);
}
-void SslConnector::eof(SslIO&) {
+void SslConnector::eof(AsynchIO&) {
close();
}
-void SslConnector::disconnected(SslIO&) {
+void SslConnector::disconnected(AsynchIO&) {
close();
socketClosed(*aio, socket);
}
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/TCPConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/TCPConnector.cpp?rev=1411155&r1=1411154&r2=1411155&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/TCPConnector.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/TCPConnector.cpp Mon Nov 19 12:47:53 2012
@@ -72,12 +72,13 @@ TCPConnector::TCPConnector(Poller::share
closed(true),
shutdownHandler(0),
input(0),
+ socket(createSocket()),
connector(0),
aio(0),
poller(p)
{
QPID_LOG(debug, "TCPConnector created for " << version);
- settings.configureSocket(socket);
+ settings.configureSocket(*socket);
}
TCPConnector::~TCPConnector() {
@@ -88,7 +89,7 @@ void TCPConnector::connect(const std::st
Mutex::ScopedLock l(lock);
assert(closed);
connector = AsynchConnector::create(
- socket,
+ *socket,
host, port,
boost::bind(&TCPConnector::connected, this, _1),
boost::bind(&TCPConnector::connectFailed, this, _3));
@@ -99,7 +100,7 @@ void TCPConnector::connect(const std::st
void TCPConnector::connected(const Socket&) {
connector = 0;
- aio = AsynchIO::create(socket,
+ aio = AsynchIO::create(*socket,
boost::bind(&TCPConnector::readbuff, this, _1, _2),
boost::bind(&TCPConnector::eof, this, _1),
boost::bind(&TCPConnector::disconnected, this, _1),
@@ -116,7 +117,7 @@ void TCPConnector::start(sys::AsynchIO*
aio->createBuffers(maxFrameSize);
- identifier = str(format("[%1%]") % socket.getFullAddress());
+ identifier = str(format("[%1%]") % socket->getFullAddress());
}
void TCPConnector::initAmqp() {
@@ -127,7 +128,7 @@ void TCPConnector::initAmqp() {
void TCPConnector::connectFailed(const std::string& msg) {
connector = 0;
QPID_LOG(warning, "Connect failed: " << msg);
- socket.close();
+ socket->close();
if (!closed)
closed = true;
if (shutdownHandler)
@@ -318,7 +319,7 @@ void TCPConnector::eof(AsynchIO&) {
void TCPConnector::disconnected(AsynchIO&) {
close();
- socketClosed(*aio, socket);
+ socketClosed(*aio, *socket);
}
void TCPConnector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer> sl)
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/TCPConnector.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/TCPConnector.h?rev=1411155&r1=1411154&r2=1411155&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/TCPConnector.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/TCPConnector.h Mon Nov 19 12:47:53 2012
@@ -35,7 +35,7 @@
#include "qpid/sys/Thread.h"
#include <boost/shared_ptr.hpp>
-#include <boost/weak_ptr.hpp>
+#include <boost/scoped_ptr.hpp>
#include <deque>
#include <string>
@@ -66,7 +66,7 @@ class TCPConnector : public Connector, p
sys::ShutdownHandler* shutdownHandler;
framing::InputHandler* input;
- sys::Socket socket;
+ boost::scoped_ptr<sys::Socket> socket;
sys::AsynchConnector* connector;
sys::AsynchIO* aio;
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/framing/FieldValue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/framing/FieldValue.cpp?rev=1411155&r1=1411154&r2=1411155&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/framing/FieldValue.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/framing/FieldValue.cpp Mon Nov 19 12:47:53 2012
@@ -23,6 +23,7 @@
#include "qpid/framing/Buffer.h"
#include "qpid/framing/Endian.h"
#include "qpid/framing/List.h"
+#include "qpid/framing/Uuid.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/Msg.h"
@@ -43,7 +44,9 @@ void FieldValue::setType(uint8_t type)
data.reset(new EncodedValue<List>());
} else if (typeOctet == 0xAA) {
data.reset(new EncodedValue<Array>());
- } else {
+ } else if (typeOctet == 0x48) {
+ data.reset(new UuidData());
+ } else {
uint8_t lenType = typeOctet >> 4;
switch(lenType){
case 0:
@@ -213,9 +216,12 @@ Integer8Value::Integer8Value(int8_t v) :
Integer16Value::Integer16Value(int16_t v) :
FieldValue(0x11, new FixedWidthValue<2>(v))
{}
-UuidValue::UuidValue(const unsigned char* v) :
- FieldValue(0x48, new FixedWidthValue<16>(v))
-{}
+
+UuidData::UuidData() {}
+UuidData::UuidData(const unsigned char* bytes) : FixedWidthValue<16>(bytes) {}
+bool UuidData::convertsToString() const { return true; }
+std::string UuidData::getString() const { return Uuid(rawOctets()).str(); }
+UuidValue::UuidValue(const unsigned char* v) : FieldValue(0x48, new UuidData(v)) {}
void FieldValue::print(std::ostream& out) const {
data->print(out);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org