You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2012/08/03 14:14:12 UTC
svn commit: r1368910 [5/27] - in /qpid/branches/asyncstore: ./ bin/
cpp/bindings/qmf/ruby/ cpp/bindings/qmf2/ruby/ cpp/bindings/qpid/python/
cpp/bindings/qpid/ruby/ cpp/bindings/qpid/ruby/features/
cpp/bindings/qpid/ruby/features/step_definitions/ cpp/...
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/LinkRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/LinkRegistry.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/LinkRegistry.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/LinkRegistry.cpp Fri Aug 3 12:13:32 2012
@@ -68,54 +68,92 @@ LinkRegistry::LinkRegistry (Broker* _bro
LinkRegistry::~LinkRegistry() {}
+/** find link by the *configured* remote address */
+boost::shared_ptr<Link> LinkRegistry::getLink(const std::string& host,
+ uint16_t port,
+ const std::string& transport)
+{
+ Mutex::ScopedLock locker(lock);
+ for (LinkMap::iterator i = links.begin(); i != links.end(); ++i) {
+ Link::shared_ptr& link = i->second;
+ if (link->getHost() == host &&
+ link->getPort() == port &&
+ (transport.empty() || link->getTransport() == transport))
+ return link;
+ }
+ return boost::shared_ptr<Link>();
+}
-void LinkRegistry::changeAddress(const qpid::Address& oldAddress, const qpid::Address& newAddress)
+/** find link by name */
+boost::shared_ptr<Link> LinkRegistry::getLink(const std::string& name)
{
Mutex::ScopedLock locker(lock);
- std::string oldKey = createKey(oldAddress);
- std::string newKey = createKey(newAddress);
- if (links.find(newKey) != links.end()) {
- QPID_LOG(error, "Attempted to update key from " << oldKey << " to " << newKey << " which is already in use");
- } else {
- LinkMap::iterator i = links.find(oldKey);
- if (i == links.end()) {
- QPID_LOG(error, "Attempted to update key from " << oldKey << " which does not exist, to " << newKey);
- } else {
- links[newKey] = i->second;
- links.erase(oldKey);
- QPID_LOG(info, "Updated link key from " << oldKey << " to " << newKey);
- }
- }
+ LinkMap::iterator l = links.find(name);
+ if (l != links.end())
+ return l->second;
+ return boost::shared_ptr<Link>();
}
-pair<Link::shared_ptr, bool> LinkRegistry::declare(const string& host,
+pair<Link::shared_ptr, bool> LinkRegistry::declare(const string& name,
+ const string& host,
uint16_t port,
const string& transport,
bool durable,
const string& authMechanism,
const string& username,
- const string& password)
+ const string& password,
+ bool failover)
{
Mutex::ScopedLock locker(lock);
- string key = createKey(host, port);
- LinkMap::iterator i = links.find(key);
+ LinkMap::iterator i = links.find(name);
if (i == links.end())
{
Link::shared_ptr link;
- link = Link::shared_ptr (new Link (this, store, host, port, transport, durable,
- authMechanism, username, password,
- broker, parent));
- links[key] = link;
+ link = Link::shared_ptr (
+ new Link (name, this, host, port, transport,
+ boost::bind(&LinkRegistry::linkDestroyed, this, _1),
+ durable, authMechanism, username, password, broker,
+ parent, failover));
+ if (durable && store) store->create(*link);
+ links[name] = link;
+ QPID_LOG(debug, "Creating new link; name=" << name );
return std::pair<Link::shared_ptr, bool>(link, true);
}
return std::pair<Link::shared_ptr, bool>(i->second, false);
}
-pair<Bridge::shared_ptr, bool> LinkRegistry::declare(const std::string& host,
- uint16_t port,
+/** find bridge by link & route info */
+Bridge::shared_ptr LinkRegistry::getBridge(const Link& link,
+ const std::string& src,
+ const std::string& dest,
+ const std::string& key)
+{
+ Mutex::ScopedLock locker(lock);
+ for (BridgeMap::iterator i = bridges.begin(); i != bridges.end(); ++i) {
+ if (i->second->getSrc() == src && i->second->getDest() == dest &&
+ i->second->getKey() == key && i->second->getLink() &&
+ i->second->getLink()->getName() == link.getName()) {
+ return i->second;
+ }
+ }
+ return Bridge::shared_ptr();
+}
+
+/** find bridge by name */
+Bridge::shared_ptr LinkRegistry::getBridge(const std::string& name)
+{
+ Mutex::ScopedLock locker(lock);
+ BridgeMap::iterator b = bridges.find(name);
+ if (b != bridges.end())
+ return b->second;
+ return Bridge::shared_ptr();
+}
+
+pair<Bridge::shared_ptr, bool> LinkRegistry::declare(const std::string& name,
+ Link& link,
bool durable,
const std::string& src,
const std::string& dest,
@@ -126,22 +164,32 @@ pair<Bridge::shared_ptr, bool> LinkRegis
const std::string& excludes,
bool dynamic,
uint16_t sync,
- Bridge::InitializeCallback init
+ Bridge::InitializeCallback init,
+ const std::string& queueName,
+ const std::string& altExchange
)
{
Mutex::ScopedLock locker(lock);
- QPID_LOG(debug, "Bridge declared " << host << ": " << port << " from " << src << " to " << dest << " (" << key << ")");
- string linkKey = createKey(host, port);
- stringstream keystream;
- keystream << linkKey << "!" << src << "!" << dest << "!" << key;
- string bridgeKey = keystream.str();
-
- LinkMap::iterator l = links.find(linkKey);
- if (l == links.end())
- return pair<Bridge::shared_ptr, bool>(Bridge::shared_ptr(), false);
+ // Durable bridges are only valid on durable links
+ if (durable && !link.isDurable()) {
+ QPID_LOG(error, "Can't create a durable route '" << name << "' on a non-durable link '" << link.getName());
+ return pair<Bridge::shared_ptr, bool>(Bridge::shared_ptr(), false);
+ }
- BridgeMap::iterator b = bridges.find(bridgeKey);
+ if (dynamic) {
+ Exchange::shared_ptr exchange = broker->getExchanges().get(src);
+ if (exchange.get() == 0) {
+ QPID_LOG(error, "Exchange not found, name='" << src << "'" );
+ return pair<Bridge::shared_ptr, bool>(Bridge::shared_ptr(), false);
+ }
+ if (!exchange->supportsDynamicBinding()) {
+ QPID_LOG(error, "Exchange type does not support dynamic routing, name='" << src << "'");
+ return pair<Bridge::shared_ptr, bool>(Bridge::shared_ptr(), false);
+ }
+ }
+
+ BridgeMap::iterator b = bridges.find(name);
if (b == bridges.end())
{
_qmf::ArgsLinkBridge args;
@@ -159,23 +207,29 @@ pair<Bridge::shared_ptr, bool> LinkRegis
args.i_sync = sync;
bridge = Bridge::shared_ptr
- (new Bridge (l->second.get(), l->second->nextChannel(),
- boost::bind(&LinkRegistry::destroy, this,
- host, port, src, dest, key),
- args, init));
- bridges[bridgeKey] = bridge;
- l->second->add(bridge);
+ (new Bridge (name, &link, link.nextChannel(),
+ boost::bind(&LinkRegistry::destroyBridge, this, _1),
+ args, init, queueName, altExchange));
+ bridges[name] = bridge;
+ link.add(bridge);
+ if (durable && store)
+ store->create(*bridge);
+
+ QPID_LOG(debug, "Bridge '" << name <<"' declared on link '" << link.getName() <<
+ "' from " << src << " to " << dest << " (" << key << ")");
+
return std::pair<Bridge::shared_ptr, bool>(bridge, true);
}
return std::pair<Bridge::shared_ptr, bool>(b->second, false);
}
-void LinkRegistry::destroy(const string& host, const uint16_t port)
+/** called back by the link when it has completed its cleanup and can be removed. */
+void LinkRegistry::linkDestroyed(Link *link)
{
+ QPID_LOG(debug, "LinkRegistry::destroy(); link= " << link->getName());
Mutex::ScopedLock locker(lock);
- string key = createKey(host, port);
- LinkMap::iterator i = links.find(key);
+ LinkMap::iterator i = links.find(link->getName());
if (i != links.end())
{
if (i->second->isDurable() && store)
@@ -184,27 +238,20 @@ void LinkRegistry::destroy(const string&
}
}
-void LinkRegistry::destroy(const std::string& host,
- const uint16_t port,
- const std::string& src,
- const std::string& dest,
- const std::string& key)
+/** called back by bridge when its destruction has been requested */
+void LinkRegistry::destroyBridge(Bridge *bridge)
{
+ QPID_LOG(debug, "LinkRegistry::destroy(); bridge= " << bridge->getName());
Mutex::ScopedLock locker(lock);
- string linkKey = createKey(host, port);
- stringstream keystream;
- keystream << linkKey << "!" << src << "!" << dest << "!" << key;
- string bridgeKey = keystream.str();
- LinkMap::iterator l = links.find(linkKey);
- if (l == links.end())
- return;
-
- BridgeMap::iterator b = bridges.find(bridgeKey);
+ BridgeMap::iterator b = bridges.find(bridge->getName());
if (b == bridges.end())
return;
- l->second->cancel(b->second);
+ Link *link = b->second->getLink();
+ if (link) {
+ link->cancel(b->second);
+ }
if (b->second->isDurable())
store->destroy(*(b->second));
bridges.erase(b);
@@ -219,26 +266,71 @@ MessageStore* LinkRegistry::getStore() c
return store;
}
-Link::shared_ptr LinkRegistry::findLink(const std::string& keyOrMgmtId)
-{
- // Convert keyOrMgmtId to a host:port key.
- //
- // TODO aconway 2011-02-01: centralize code that constructs/parses
- // connection management IDs. Currently sys:: protocol factories
- // and IO plugins construct the IDs and LinkRegistry parses them.
- size_t separator = keyOrMgmtId.find('-');
- if (separator == std::string::npos) separator = 0;
- std::string key = keyOrMgmtId.substr(separator+1, std::string::npos);
+namespace {
+ void extractHostPort(const std::string& connId, std::string *host, uint16_t *port)
+ {
+ // Extract host and port of remote broker from connection id string.
+ //
+ // TODO aconway 2011-02-01: centralize code that constructs/parses connection
+ // management IDs. Currently sys:: protocol factories and IO plugins construct the
+ // IDs and LinkRegistry parses them.
+ // KAG: current connection id format assumed:
+ // "localhost:port-remotehost:port". In the case of IpV6, the host addresses are
+ // contained within brackets "[...]", example:
+ // connId="[::1]:36859-[::1]:48603". Liberal use of "asserts" provided to alert us
+ // if this assumption changes!
+ size_t separator = connId.find('-');
+ assert(separator != std::string::npos);
+ std::string remote = connId.substr(separator+1, std::string::npos);
+ separator = remote.rfind(":");
+ assert(separator != std::string::npos);
+ *host = remote.substr(0, separator);
+ // IPv6 - host is bracketed by "[]", strip them
+ if ((*host)[0] == '[' && (*host)[host->length() - 1] == ']') {
+ *host = host->substr(1, host->length() - 2);
+ }
+ try {
+ *port = boost::lexical_cast<uint16_t>(remote.substr(separator+1, std::string::npos));
+ } catch (const boost::bad_lexical_cast&) {
+ QPID_LOG(error, "Invalid format for connection identifier! '" << connId << "'");
+ assert(false);
+ }
+ }
+}
+/** find the Link that corresponds to the given connection */
+Link::shared_ptr LinkRegistry::findLink(const std::string& connId)
+{
Mutex::ScopedLock locker(lock);
- LinkMap::iterator l = links.find(key);
- if (l != links.end()) return l->second;
- else return Link::shared_ptr();
+ ConnectionMap::iterator c = connections.find(connId);
+ if (c != connections.end()) {
+ LinkMap::iterator l = links.find(c->second);
+ if (l != links.end())
+ return l->second;
+ }
+ return Link::shared_ptr();
}
void LinkRegistry::notifyConnection(const std::string& key, Connection* c)
{
- Link::shared_ptr link = findLink(key);
+ // find a link that is attempting to connect to the remote, and
+ // create a mapping from connection id to link
+ QPID_LOG(debug, "LinkRegistry::notifyConnection(); key=" << key );
+ std::string host;
+ uint16_t port = 0;
+ extractHostPort( key, &host, &port );
+ Link::shared_ptr link;
+ {
+ Mutex::ScopedLock locker(lock);
+ for (LinkMap::iterator l = links.begin(); l != links.end(); ++l) {
+ if (l->second->pendingConnection(host, port)) {
+ link = l->second;
+ connections[key] = link->getName();
+ break;
+ }
+ }
+ }
+
if (link) {
link->established(c);
c->setUserId(str(format("%1%@%2%") % link->getUsername() % realm));
@@ -343,20 +435,6 @@ std::string LinkRegistry::getAuthIdentit
}
-std::string LinkRegistry::createKey(const qpid::Address& a) {
- // TODO aconway 2010-05-11: key should also include protocol/transport to
- // be unique. Requires refactor of LinkRegistry interface.
- return createKey(a.host, a.port);
-}
-
-std::string LinkRegistry::createKey(const std::string& host, uint16_t port) {
- // TODO aconway 2010-05-11: key should also include protocol/transport to
- // be unique. Requires refactor of LinkRegistry interface.
- stringstream keystream;
- keystream << host << ":" << port;
- return keystream.str();
-}
-
void LinkRegistry::setPassive(bool p)
{
Mutex::ScopedLock locker(lock);
@@ -369,10 +447,12 @@ void LinkRegistry::setPassive(bool p)
}
void LinkRegistry::eachLink(boost::function<void(boost::shared_ptr<Link>)> f) {
+ Mutex::ScopedLock locker(lock);
for (LinkMap::iterator i = links.begin(); i != links.end(); ++i) f(i->second);
}
void LinkRegistry::eachBridge(boost::function<void(boost::shared_ptr<Bridge>)> f) {
+ Mutex::ScopedLock locker(lock);
for (BridgeMap::iterator i = bridges.begin(); i != bridges.end(); ++i) f(i->second);
}
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/LinkRegistry.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/LinkRegistry.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/LinkRegistry.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/LinkRegistry.h Fri Aug 3 12:13:32 2012
@@ -42,9 +42,11 @@ namespace broker {
class LinkRegistry {
typedef std::map<std::string, boost::shared_ptr<Link> > LinkMap;
typedef std::map<std::string, Bridge::shared_ptr> BridgeMap;
+ typedef std::map<std::string, std::string> ConnectionMap;
- LinkMap links;
- BridgeMap bridges;
+ LinkMap links; /** indexed by name of Link */
+ BridgeMap bridges; /** indexed by name of Bridge */
+ ConnectionMap connections; /** indexed by connection identifier, gives link name */
qpid::sys::Mutex lock;
Broker* broker;
@@ -54,15 +56,18 @@ namespace broker {
std::string realm;
boost::shared_ptr<Link> findLink(const std::string& key);
- static std::string createKey(const Address& address);
- static std::string createKey(const std::string& host, uint16_t port);
- // Methods called by the connection observer.
+ // Methods called by the connection observer, key is connection identifier
void notifyConnection (const std::string& key, Connection* c);
void notifyOpened (const std::string& key);
void notifyClosed (const std::string& key);
void notifyConnectionForced (const std::string& key, const std::string& text);
- friend class LinkRegistryConnectionObserver;
+ friend class LinkRegistryConnectionObserver;
+
+ /** Notify the registry that a Link has been destroyed */
+ void linkDestroyed(Link*);
+ /** Request to destroy a Bridge */
+ void destroyBridge(Bridge*);
public:
QPID_BROKER_EXTERN LinkRegistry (); // Only used in store tests
@@ -70,17 +75,29 @@ namespace broker {
QPID_BROKER_EXTERN ~LinkRegistry();
QPID_BROKER_EXTERN std::pair<boost::shared_ptr<Link>, bool>
- declare(const std::string& host,
+ declare(const std::string& name,
+ const std::string& host,
uint16_t port,
const std::string& transport,
bool durable,
const std::string& authMechanism,
const std::string& username,
- const std::string& password);
+ const std::string& password,
+ bool failover=true);
+
+ /** determine if Link exists */
+ QPID_BROKER_EXTERN boost::shared_ptr<Link>
+ getLink(const std::string& name);
+ /** host,port,transport will be matched against the configured values, which may
+ be different from the current values due to failover */
+ QPID_BROKER_EXTERN boost::shared_ptr<Link>
+ getLink(const std::string& configHost,
+ uint16_t configPort,
+ const std::string& configTransport = std::string());
QPID_BROKER_EXTERN std::pair<Bridge::shared_ptr, bool>
- declare(const std::string& host,
- uint16_t port,
+ declare(const std::string& name,
+ Link& link,
bool durable,
const std::string& src,
const std::string& dest,
@@ -91,16 +108,18 @@ namespace broker {
const std::string& excludes,
bool dynamic,
uint16_t sync,
- Bridge::InitializeCallback=0
+ Bridge::InitializeCallback=0,
+ const std::string& queueName="",
+ const std::string& altExchange=""
);
-
- QPID_BROKER_EXTERN void destroy(const std::string& host, const uint16_t port);
-
- QPID_BROKER_EXTERN void destroy(const std::string& host,
- const uint16_t port,
- const std::string& src,
- const std::string& dest,
- const std::string& key);
+ /** determine if Bridge exists */
+ QPID_BROKER_EXTERN Bridge::shared_ptr
+ getBridge(const std::string& name);
+ QPID_BROKER_EXTERN Bridge::shared_ptr
+ getBridge(const Link& link,
+ const std::string& src,
+ const std::string& dest,
+ const std::string& key);
/**
* Register the manageable parent for declared queues
@@ -126,11 +145,6 @@ namespace broker {
QPID_BROKER_EXTERN uint16_t getPort (const std::string& key);
/**
- * Called by links failing over to new address
- */
- void changeAddress(const Address& oldAddress, const Address& newAddress);
-
- /**
* Called to alter passive state. In passive state the links
* and bridges managed by a link registry will be recorded and
* updated but links won't actually establish connections and
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Message.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Message.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Message.cpp Fri Aug 3 12:13:32 2012
@@ -384,6 +384,18 @@ void Message::addTraceId(const std::stri
}
}
+void Message::clearTrace()
+{
+ sys::Mutex::ScopedLock l(lock);
+ if (isA<MessageTransferBody>()) {
+ FieldTable& headers = getModifiableProperties<MessageProperties>()->getApplicationHeaders();
+ std::string trace = headers.getAsString(X_QPID_TRACE);
+ if (!trace.empty()) {
+ headers.setString(X_QPID_TRACE, "");
+ }
+ }
+}
+
void Message::setTimestamp()
{
sys::Mutex::ScopedLock l(lock);
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Message.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Message.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Message.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Message.h Fri Aug 3 12:13:32 2012
@@ -161,6 +161,7 @@ public:
bool isExcluded(const std::vector<std::string>& excludes) const;
void addTraceId(const std::string& id);
+ void clearTrace();
void forcePersistent();
bool isForcedPersistent();
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/MessageDeque.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/MessageDeque.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/MessageDeque.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/MessageDeque.cpp Fri Aug 3 12:13:32 2012
@@ -40,13 +40,16 @@ size_t MessageDeque::index(const framing
bool MessageDeque::deleted(const QueuedMessage& m)
{
size_t i = index(m.position);
- if (i < messages.size() && messages[i].status != QueuedMessage::DELETED) {
- messages[i].status = QueuedMessage::DELETED;
- clean();
- return true;
- } else {
- return false;
+ if (i < messages.size()) {
+ QueuedMessage *qm = &messages[i];
+ if (qm->status != QueuedMessage::DELETED) {
+ qm->status = QueuedMessage::DELETED;
+ qm->payload = 0; // message no longer needed
+ clean();
+ return true;
+ }
}
+ return false;
}
size_t MessageDeque::size()
@@ -144,6 +147,7 @@ QueuedMessage* MessageDeque::pushPtr(con
messages.back().status = QueuedMessage::AVAILABLE;
if (head >= messages.size()) head = messages.size() - 1;
++available;
+ clean(); // QPID-4046: let producer help clean the backlog of deleted messages
return &messages.back();
}
@@ -173,12 +177,37 @@ void MessageDeque::updateAcquired(const
}
}
+namespace {
+bool isNotDeleted(const QueuedMessage& qm) { return qm.status != QueuedMessage::DELETED; }
+} // namespace
+
+void MessageDeque::setPosition(const framing::SequenceNumber& n) {
+ size_t i = index(n+1);
+ if (i >= messages.size()) return; // Nothing to do.
+
+ // Assertion to verify the precondition: no messaages after n.
+ assert(std::find_if(messages.begin()+i, messages.end(), &isNotDeleted) ==
+ messages.end());
+ messages.erase(messages.begin()+i, messages.end());
+ if (head >= messages.size()) head = messages.size() - 1;
+ // Re-count the available messages
+ available = 0;
+ for (Deque::iterator i = messages.begin(); i != messages.end(); ++i) {
+ if (i->status == QueuedMessage::AVAILABLE) ++available;
+ }
+}
+
void MessageDeque::clean()
{
- while (messages.size() && messages.front().status == QueuedMessage::DELETED) {
+ // QPID-4046: If a queue has multiple consumers, then it is possible for a large
+ // collection of deleted messages to build up. Limit the number of messages cleaned
+ // up on each call to clean().
+ size_t count = 0;
+ while (messages.size() && messages.front().status == QueuedMessage::DELETED && count < 10) {
messages.pop_front();
- if (head) --head;
+ count += 1;
}
+ head = (head > count) ? head - count : 0;
}
void MessageDeque::foreach(Functor f)
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/MessageDeque.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/MessageDeque.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/MessageDeque.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/MessageDeque.h Fri Aug 3 12:13:32 2012
@@ -44,7 +44,7 @@ class MessageDeque : public Messages
bool consume(QueuedMessage&);
bool push(const QueuedMessage& added, QueuedMessage& removed);
void updateAcquired(const QueuedMessage& acquired);
-
+ void setPosition(const framing::SequenceNumber&);
void foreach(Functor);
void removeIf(Predicate);
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/MessageMap.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/MessageMap.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/MessageMap.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/MessageMap.cpp Fri Aug 3 12:13:32 2012
@@ -21,6 +21,7 @@
#include "qpid/broker/MessageMap.h"
#include "qpid/broker/QueuedMessage.h"
#include "qpid/log/Statement.h"
+#include <algorithm>
namespace qpid {
namespace broker {
@@ -130,18 +131,24 @@ bool MessageMap::push(const QueuedMessag
QueuedMessage& a = messages[added.position];
a = added;
a.status = QueuedMessage::AVAILABLE;
- QPID_LOG(debug, "Added message at " << a.position);
+ QPID_LOG(debug, "Added message " << a);
return false;
} else {
//there is already a message with that key which needs to be replaced
removed = result.first->second;
result.first->second = replace(result.first->second, added);
result.first->second.status = QueuedMessage::AVAILABLE;
- QPID_LOG(debug, "Displaced message at " << removed.position << " with " << result.first->second.position << ": " << result.first->first);
+ QPID_LOG(debug, "Displaced message " << removed << " with " << result.first->second << ": " << result.first->first);
return true;
}
}
+void MessageMap::setPosition(const framing::SequenceNumber& seq) {
+ // Nothing to do, just assert that the precondition is respected and there
+ // are no undeleted messages after seq.
+ (void) seq; assert(messages.empty() || (--messages.end())->first <= seq);
+}
+
void MessageMap::foreach(Functor f)
{
for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) {
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/MessageMap.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/MessageMap.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/MessageMap.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/MessageMap.h Fri Aug 3 12:13:32 2012
@@ -6,7 +6,7 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
+o * regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
@@ -50,6 +50,7 @@ class MessageMap : public Messages
virtual bool browse(const framing::SequenceNumber&, QueuedMessage&, bool);
bool consume(QueuedMessage&);
virtual bool push(const QueuedMessage& added, QueuedMessage& removed);
+ void setPosition(const framing::SequenceNumber&);
void foreach(Functor);
virtual void removeIf(Predicate);
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Messages.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Messages.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Messages.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Messages.h Fri Aug 3 12:13:32 2012
@@ -21,6 +21,7 @@
* under the License.
*
*/
+#include "qpid/framing/SequenceNumber.h"
#include <boost/function.hpp>
namespace qpid {
@@ -101,14 +102,22 @@ class Messages
virtual void updateAcquired(const QueuedMessage&) { }
/**
+ * Set the position of the back of the queue. Next message enqueued will be n+1.
+ *@pre Any messages with seq > n must already be dequeued.
+ */
+ virtual void setPosition(const framing::SequenceNumber& /*n*/) = 0;
+
+ /**
* Apply, the functor to each message held
*/
+
virtual void foreach(Functor) = 0;
/**
* Remove every message held that for which the specified
* predicate returns true
*/
virtual void removeIf(Predicate) = 0;
+
private:
};
}} // namespace qpid::broker
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/NameGenerator.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/NameGenerator.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/NameGenerator.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/NameGenerator.h Fri Aug 3 12:13:32 2012
@@ -32,6 +32,7 @@ namespace qpid {
NameGenerator(const std::string& base);
std::string generate();
};
+ const std::string QPID_NAME_PREFIX("qpid."); // reserved for private names
}
}
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/PriorityQueue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/PriorityQueue.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/PriorityQueue.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/PriorityQueue.cpp Fri Aug 3 12:13:32 2012
@@ -121,6 +121,10 @@ void PriorityQueue::updateAcquired(const
fifo.updateAcquired(acquired);
}
+void PriorityQueue::setPosition(const framing::SequenceNumber& n) {
+ fifo.setPosition(n);
+}
+
void PriorityQueue::foreach(Functor f)
{
fifo.foreach(f);
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/PriorityQueue.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/PriorityQueue.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/PriorityQueue.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/PriorityQueue.h Fri Aug 3 12:13:32 2012
@@ -52,6 +52,7 @@ class PriorityQueue : public Messages
bool consume(QueuedMessage&);
bool push(const QueuedMessage& added, QueuedMessage& removed);
void updateAcquired(const QueuedMessage& acquired);
+ void setPosition(const framing::SequenceNumber&);
void foreach(Functor);
void removeIf(Predicate);
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/PrivateImplRef.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/PrivateImplRef.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/PrivateImplRef.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/PrivateImplRef.h Fri Aug 3 12:13:32 2012
@@ -88,15 +88,15 @@ template <class T> class PrivateImplRef
/** Set the implementation pointer in a handle */
static void set(T& t, const intrusive_ptr& p) {
if (t.impl == p) return;
- if (t.impl) boost::intrusive_ptr_release(t.impl);
+ if (t.impl) intrusive_ptr_release(t.impl);
t.impl = p.get();
- if (t.impl) boost::intrusive_ptr_add_ref(t.impl);
+ if (t.impl) intrusive_ptr_add_ref(t.impl);
}
// Helper functions to implement the ctor, dtor, copy, assign
- static void ctor(T& t, Impl* p) { t.impl = p; if (p) boost::intrusive_ptr_add_ref(p); }
+ static void ctor(T& t, Impl* p) { t.impl = p; if (p) intrusive_ptr_add_ref(p); }
static void copy(T& t, const T& x) { if (&t == &x) return; t.impl = 0; assign(t, x); }
- static void dtor(T& t) { if(t.impl) boost::intrusive_ptr_release(t.impl); }
+ static void dtor(T& t) { if(t.impl) intrusive_ptr_release(t.impl); }
static T& assign(T& t, const T& x) { set(t, get(x)); return t;}
};
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Queue.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Queue.cpp Fri Aug 3 12:13:32 2012
@@ -49,6 +49,7 @@
#include "qpid/types/Variant.h"
#include "qmf/org/apache/qpid/broker/ArgsQueuePurge.h"
#include "qmf/org/apache/qpid/broker/ArgsQueueReroute.h"
+#include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
#include <iostream>
#include <algorithm>
@@ -67,6 +68,7 @@ using qpid::management::ManagementAgent;
using qpid::management::ManagementObject;
using qpid::management::Manageable;
using qpid::management::Args;
+using std::string;
using std::for_each;
using std::mem_fun;
namespace _qmf = qmf::org::apache::qpid::broker;
@@ -176,7 +178,8 @@ Queue::Queue(const string& _name, bool _
ManagementAgent* agent = broker->getManagementAgent();
if (agent != 0) {
- mgmtObject = new _qmf::Queue(agent, this, parent, _name, _store != 0, _autodelete, _owner != 0);
+ mgmtObject = new _qmf::Queue(agent, this, parent, _name, _store != 0, _autodelete);
+ mgmtObject->set_exclusive(_owner != 0);
agent->addObject(mgmtObject, 0, store != 0);
brokerMgmtObject = (qmf::org::apache::qpid::broker::Broker*) broker->GetManagementObject();
if (brokerMgmtObject)
@@ -587,21 +590,51 @@ QueuedMessage Queue::get(){
return msg;
}
-bool collect_if_expired(std::deque<QueuedMessage>& expired, QueuedMessage& message)
+namespace {
+bool collectIf(QueuedMessage& qm, Messages::Predicate predicate,
+ std::deque<QueuedMessage>& collection)
{
- if (message.payload->hasExpired()) {
- expired.push_back(message);
+ if (predicate(qm)) {
+ collection.push_back(qm);
return true;
} else {
return false;
}
}
+bool isExpired(const QueuedMessage& qm) { return qm.payload->hasExpired(); }
+} // namespace
+
+void Queue::dequeueIf(Messages::Predicate predicate,
+ std::deque<QueuedMessage>& dequeued)
+{
+ {
+ Mutex::ScopedLock locker(messageLock);
+ messages->removeIf(boost::bind(&collectIf, _1, predicate, boost::ref(dequeued)));
+ }
+ if (!dequeued.empty()) {
+ if (mgmtObject) {
+ mgmtObject->inc_acquires(dequeued.size());
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_acquires(dequeued.size());
+ }
+ for (std::deque<QueuedMessage>::const_iterator i = dequeued.begin();
+ i != dequeued.end(); ++i) {
+ {
+ // KAG: should be safe to retake lock after the removeIf, since
+ // no other thread can touch these messages after the removeIf() call
+ Mutex::ScopedLock locker(messageLock);
+ observeAcquire(*i, locker);
+ }
+ dequeue( 0, *i );
+ }
+ }
+}
+
/**
*@param lapse: time since the last purgeExpired
*/
-void Queue::purgeExpired(qpid::sys::Duration lapse)
-{
+void Queue::purgeExpired(sys::Duration lapse) {
//As expired messages are discarded during dequeue also, only
//bother explicitly expiring if the rate of dequeues since last
//attempt is less than one per second.
@@ -609,37 +642,18 @@ void Queue::purgeExpired(qpid::sys::Dura
dequeueSincePurge -= count;
int seconds = int64_t(lapse)/qpid::sys::TIME_SEC;
if (seconds == 0 || count / seconds < 1) {
- std::deque<QueuedMessage> expired;
- {
- Mutex::ScopedLock locker(messageLock);
- messages->removeIf(boost::bind(&collect_if_expired, boost::ref(expired), _1));
- }
-
- if (!expired.empty()) {
+ std::deque<QueuedMessage> dequeued;
+ dequeueIf(boost::bind(&isExpired, _1), dequeued);
+ if (dequeued.size()) {
if (mgmtObject) {
- mgmtObject->inc_acquires(expired.size());
- mgmtObject->inc_discardsTtl(expired.size());
- if (brokerMgmtObject) {
- brokerMgmtObject->inc_acquires(expired.size());
- brokerMgmtObject->inc_discardsTtl(expired.size());
- }
- }
-
- for (std::deque<QueuedMessage>::const_iterator i = expired.begin();
- i != expired.end(); ++i) {
- {
- // KAG: should be safe to retake lock after the removeIf, since
- // no other thread can touch these messages after the removeIf() call
- Mutex::ScopedLock locker(messageLock);
- observeAcquire(*i, locker);
- }
- dequeue( 0, *i );
+ mgmtObject->inc_discardsTtl(dequeued.size());
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_discardsTtl(dequeued.size());
}
}
}
}
-
namespace {
// for use with purge/move below - collect messages that match a given filter
//
@@ -797,6 +811,7 @@ uint32_t Queue::purge(const uint32_t pur
// now reroute if necessary
if (dest.get()) {
assert(qmsg->payload);
+ qmsg->payload->clearTrace();
DeliverableMessage dmsg(qmsg->payload);
dest->routeWithAlternate(dmsg);
}
@@ -888,9 +903,10 @@ void Queue::push(boost::intrusive_ptr<Me
if (mgmtObject) {
mgmtObject->inc_acquires();
mgmtObject->inc_discardsLvq();
- if (brokerMgmtObject)
+ if (brokerMgmtObject) {
brokerMgmtObject->inc_acquires();
brokerMgmtObject->inc_discardsLvq();
+ }
}
if (isRecovery) {
//can't issue new requests for the store until
@@ -1470,12 +1486,18 @@ boost::shared_ptr<Exchange> Queue::getAl
return alternateExchange;
}
-void tryAutoDeleteImpl(Broker& broker, Queue::shared_ptr queue)
+void tryAutoDeleteImpl(Broker& broker, Queue::shared_ptr queue, const std::string& connectionId, const std::string& userId)
{
if (broker.getQueues().destroyIf(queue->getName(),
boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) {
QPID_LOG(debug, "Auto-deleting " << queue->getName());
queue->destroyed();
+
+ if (broker.getManagementAgent())
+ broker.getManagementAgent()->raiseEvent(_qmf::EventQueueDelete(connectionId, userId, queue->getName()));
+ QPID_LOG_CAT(debug, model, "Delete queue. name:" << queue->getName()
+ << " user:" << userId
+ << " rhost:" << connectionId );
}
}
@@ -1483,9 +1505,11 @@ struct AutoDeleteTask : qpid::sys::Timer
{
Broker& broker;
Queue::shared_ptr queue;
+ std::string connectionId;
+ std::string userId;
- AutoDeleteTask(Broker& b, Queue::shared_ptr q, AbsTime fireTime)
- : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion:"+q->getName()), broker(b), queue(q) {}
+ AutoDeleteTask(Broker& b, Queue::shared_ptr q, const std::string& cId, const std::string& uId, AbsTime fireTime)
+ : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion:"+q->getName()), broker(b), queue(q), connectionId(cId), userId(uId) {}
void fire()
{
@@ -1493,19 +1517,19 @@ struct AutoDeleteTask : qpid::sys::Timer
//created, but then became unused again before the task fired;
//in this case ignore this request as there will have already
//been a later task added
- tryAutoDeleteImpl(broker, queue);
+ tryAutoDeleteImpl(broker, queue, connectionId, userId);
}
};
-void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue)
+void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue, const std::string& connectionId, const std::string& userId)
{
if (queue->autoDeleteTimeout && queue->canAutoDelete()) {
AbsTime time(now(), Duration(queue->autoDeleteTimeout * TIME_SEC));
- queue->autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new AutoDeleteTask(broker, queue, time));
+ queue->autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new AutoDeleteTask(broker, queue, connectionId, userId, time));
broker.getClusterTimer().add(queue->autoDeleteTask);
QPID_LOG(debug, "Timed auto-delete for " << queue->getName() << " initiated");
} else {
- tryAutoDeleteImpl(broker, queue);
+ tryAutoDeleteImpl(broker, queue, connectionId, userId);
}
}
@@ -1659,13 +1683,28 @@ void Queue::query(qpid::types::Variant::
if (allocator) allocator->query(results);
}
+namespace {
+struct After {
+ framing::SequenceNumber seq;
+ After(framing::SequenceNumber s) : seq(s) {}
+ bool operator()(const QueuedMessage& qm) { return qm.position > seq; }
+};
+} // namespace
+
+
void Queue::setPosition(SequenceNumber n) {
Mutex::ScopedLock locker(messageLock);
+ if (n < sequence) {
+ std::deque<QueuedMessage> dequeued;
+ dequeueIf(After(n), dequeued);
+ messages->setPosition(n);
+ }
sequence = n;
QPID_LOG(trace, "Set position to " << sequence << " on " << getName());
}
SequenceNumber Queue::getPosition() {
+ Mutex::ScopedLock locker(messageLock);
return sequence;
}
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/Queue.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/Queue.h Fri Aug 3 12:13:32 2012
@@ -175,6 +175,7 @@ class Queue : public boost::enable_share
void configureImpl(const qpid::framing::FieldTable& settings);
void checkNotDeleted(const Consumer::shared_ptr& c);
void notifyDeleted();
+ void dequeueIf(Messages::Predicate predicate, std::deque<QueuedMessage>& dequeued);
public:
@@ -343,7 +344,7 @@ class Queue : public boost::enable_share
* exclusive owner
*/
static Queue::shared_ptr restore(QueueRegistry& queues, framing::Buffer& buffer);
- static void tryAutoDelete(Broker& broker, Queue::shared_ptr);
+ static void tryAutoDelete(Broker& broker, Queue::shared_ptr, const std::string& connectionId, const std::string& userId);
virtual void setExternalQueueStore(ExternalQueueStore* inst);
@@ -375,12 +376,21 @@ class Queue : public boost::enable_share
std::for_each<Observers::iterator, F>(observers.begin(), observers.end(), f);
}
- /** Set the position sequence number for the next message on the queue.
- * Must be >= the current sequence number.
- * Used by cluster to replicate queues.
+ /**
+ * Set the sequence number for the back of the queue, the
+ * next message enqueued will be pos+1.
+ * If pos > getPosition() this creates a gap in the sequence numbers.
+ * if pos < getPosition() the back of the queue is reset to pos,
+ *
+ * The _caller_ must ensure that any messages after pos have been dequeued.
+ *
+ * Used by HA/cluster code for queue replication.
*/
QPID_BROKER_EXTERN void setPosition(framing::SequenceNumber pos);
- /** return current position sequence number for the next message on the queue.
+
+ /**
+ *@return sequence number for the back of the queue. The next message pushed
+ * will be at getPosition+1
*/
QPID_BROKER_EXTERN framing::SequenceNumber getPosition();
QPID_BROKER_EXTERN void addObserver(boost::shared_ptr<QueueObserver>);
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/QueueFlowLimit.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/QueueFlowLimit.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/QueueFlowLimit.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/QueueFlowLimit.cpp Fri Aug 3 12:13:32 2012
@@ -75,8 +75,8 @@ namespace {
result = v->get<int64_t>();
QPID_LOG(debug, "Got integer value for " << key << ": " << result);
if (result >= 0) return result;
- } else if (v->convertsTo<string>()) {
- string s(v->get<string>());
+ } else if (v->convertsTo<std::string>()) {
+ std::string s(v->get<std::string>());
QPID_LOG(debug, "Got string value for " << key << ": " << s);
std::istringstream convert(s);
if (convert >> result && result >= 0) return result;
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/QueuePolicy.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/QueuePolicy.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/QueuePolicy.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/QueuePolicy.cpp Fri Aug 3 12:13:32 2012
@@ -133,8 +133,8 @@ T getCapacity(const FieldTable& settings
result = v->get<T>();
QPID_LOG(debug, "Got integer value for " << key << ": " << result);
if (result >= 0) return result;
- } else if (v->convertsTo<string>()) {
- string s(v->get<string>());
+ } else if (v->convertsTo<std::string>()) {
+ std::string s(v->get<std::string>());
QPID_LOG(debug, "Got string value for " << key << ": " << s);
std::istringstream convert(s);
if (convert >> result && result >= 0 && convert.eof()) return result;
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/QueueRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/QueueRegistry.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/QueueRegistry.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/QueueRegistry.cpp Fri Aug 3 12:13:32 2012
@@ -18,6 +18,7 @@
* under the License.
*
*/
+#include "qpid/broker/Broker.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/QueueEvents.h"
@@ -46,40 +47,49 @@ QueueRegistry::declare(const string& dec
definition from persistente
record*/)
{
- RWlock::ScopedWlock locker(lock);
- string name = declareName.empty() ? generateName() : declareName;
- assert(!name.empty());
- QueueMap::iterator i = queues.find(name);
-
- if (i == queues.end()) {
- Queue::shared_ptr queue(new Queue(name, autoDelete, durable ? store : 0, owner, parent, broker));
- if (alternate) {
- queue->setAlternateExchange(alternate);//need to do this *before* create
- alternate->incAlternateUsers();
- }
- if (!recovering) {
- //apply settings & create persistent record if required
- queue->create(arguments);
+ Queue::shared_ptr queue;
+ std::pair<Queue::shared_ptr, bool> result;
+ {
+ RWlock::ScopedWlock locker(lock);
+ string name = declareName.empty() ? generateName() : declareName;
+ assert(!name.empty());
+ QueueMap::iterator i = queues.find(name);
+
+ if (i == queues.end()) {
+ queue.reset(new Queue(name, autoDelete, durable ? store : 0, owner, parent, broker));
+ if (alternate) {
+ queue->setAlternateExchange(alternate);//need to do this *before* create
+ alternate->incAlternateUsers();
+ }
+ if (!recovering) {
+ //apply settings & create persistent record if required
+ queue->create(arguments);
+ } else {
+ //i.e. recovering a queue for which we already have a persistent record
+ queue->configure(arguments);
+ }
+ queues[name] = queue;
+ if (lastNode) queue->setLastNodeFailure();
+ result = std::pair<Queue::shared_ptr, bool>(queue, true);
} else {
- //i.e. recovering a queue for which we already have a persistent record
- queue->configure(arguments);
+ result = std::pair<Queue::shared_ptr, bool>(i->second, false);
}
- queues[name] = queue;
- if (lastNode) queue->setLastNodeFailure();
-
- return std::pair<Queue::shared_ptr, bool>(queue, true);
- } else {
- return std::pair<Queue::shared_ptr, bool>(i->second, false);
}
+ if (broker && queue) broker->getConfigurationObservers().queueCreate(queue);
+ return result;
}
-void QueueRegistry::destroyLH (const string& name){
- queues.erase(name);
-}
-
-void QueueRegistry::destroy (const string& name){
- RWlock::ScopedWlock locker(lock);
- destroyLH (name);
+void QueueRegistry::destroy(const string& name) {
+ Queue::shared_ptr q;
+ {
+ qpid::sys::RWlock::ScopedWlock locker(lock);
+ QueueMap::iterator i = queues.find(name);
+ if (i != queues.end()) {
+ Queue::shared_ptr q = i->second;
+ queues.erase(i);
+ }
+ }
+ if (broker && q) broker->getConfigurationObservers().queueDestroy(q);
}
Queue::shared_ptr QueueRegistry::find(const string& name){
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/QueueRegistry.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/QueueRegistry.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/QueueRegistry.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/QueueRegistry.h Fri Aug 3 12:13:32 2012
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -61,7 +61,7 @@ class QueueRegistry {
QPID_BROKER_EXTERN std::pair<boost::shared_ptr<Queue>, bool> declare(
const std::string& name,
bool durable = false,
- bool autodelete = false,
+ bool autodelete = false,
const OwnershipToken* owner = 0,
boost::shared_ptr<Exchange> alternateExchange = boost::shared_ptr<Exchange>(),
const qpid::framing::FieldTable& args = framing::FieldTable(),
@@ -82,9 +82,8 @@ class QueueRegistry {
QPID_BROKER_EXTERN void destroy(const std::string& name);
template <class Test> bool destroyIf(const std::string& name, Test test)
{
- qpid::sys::RWlock::ScopedWlock locker(lock);
if (test()) {
- destroyLH (name);
+ destroy(name);
return true;
} else {
return false;
@@ -127,13 +126,13 @@ class QueueRegistry {
for (QueueMap::const_iterator i = queues.begin(); i != queues.end(); ++i)
f(i->second);
}
-
+
/**
* Change queue mode when cluster size drops to 1 node, expands again
* in practice allows flow queue to disk when last name to be exectuted
*/
void updateQueueClusterState(bool lastNode);
-
+
private:
typedef std::map<std::string, boost::shared_ptr<Queue> > QueueMap;
QueueMap queues;
@@ -144,12 +143,9 @@ private:
management::Manageable* parent;
bool lastNode; //used to set mode on queue declare
Broker* broker;
-
- //destroy impl that assumes lock is already held:
- void destroyLH (const std::string& name);
};
-
+
}} // namespace qpid::broker
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveryManagerImpl.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveryManagerImpl.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/RecoveryManagerImpl.cpp Fri Aug 3 12:13:32 2012
@@ -144,11 +144,13 @@ RecoverableTransaction::shared_ptr Recov
RecoverableConfig::shared_ptr RecoveryManagerImpl::recoverConfig(framing::Buffer& buffer)
{
string kind;
-
+ uint32_t p = buffer.getPosition();
buffer.getShortString (kind);
- if (kind == "link")
+ buffer.setPosition(p);
+
+ if (Link::isEncodedLink(kind))
return RecoverableConfig::shared_ptr(new RecoverableConfigImpl(Link::decode (links, buffer)));
- else if (kind == "bridge")
+ else if (Bridge::isEncodedBridge(kind))
return RecoverableConfig::shared_ptr(new RecoverableConfigImpl(Bridge::decode (links, buffer)));
return RecoverableConfig::shared_ptr(); // TODO: raise an exception instead
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/SaslAuthenticator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/SaslAuthenticator.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/SaslAuthenticator.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/SaslAuthenticator.cpp Fri Aug 3 12:13:32 2012
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,6 +23,7 @@
# include "config.h"
#endif
+#include "qpid/broker/AclModule.h"
#include "qpid/broker/Connection.h"
#include "qpid/log/Statement.h"
#include "qpid/framing/reply_exceptions.h"
@@ -37,6 +38,7 @@
using qpid::sys::cyrus::CyrusSecurityLayer;
#endif
+using std::string;
using namespace qpid::framing;
using qpid::sys::SecurityLayer;
using qpid::sys::SecuritySettings;
@@ -164,13 +166,17 @@ void SaslAuthenticator::fini(void)
#endif
-std::auto_ptr<SaslAuthenticator> SaslAuthenticator::createAuthenticator(Connection& c, bool isShadow )
+std::auto_ptr<SaslAuthenticator> SaslAuthenticator::createAuthenticator(Connection& c )
{
if (c.getBroker().getOptions().auth) {
- if ( isShadow )
- return std::auto_ptr<SaslAuthenticator>(new NullAuthenticator(c, c.getBroker().getOptions().requireEncrypted));
- else
- return std::auto_ptr<SaslAuthenticator>(new CyrusAuthenticator(c, c.getBroker().getOptions().requireEncrypted));
+ // The cluster creates non-authenticated connections for internal shadow connections
+ // that are never connected to an external client.
+ if ( !c.isAuthenticated() )
+ return std::auto_ptr<SaslAuthenticator>(
+ new NullAuthenticator(c, c.getBroker().getOptions().requireEncrypted));
+ else
+ return std::auto_ptr<SaslAuthenticator>(
+ new CyrusAuthenticator(c, c.getBroker().getOptions().requireEncrypted));
} else {
QPID_LOG(debug, "SASL: No Authentication Performed");
return std::auto_ptr<SaslAuthenticator>(new NullAuthenticator(c, c.getBroker().getOptions().requireEncrypted));
@@ -178,7 +184,7 @@ std::auto_ptr<SaslAuthenticator> SaslAut
}
-NullAuthenticator::NullAuthenticator(Connection& c, bool e) : connection(c), client(c.getOutput()),
+NullAuthenticator::NullAuthenticator(Connection& c, bool e) : connection(c), client(c.getOutput()),
realm(c.getBroker().getOptions().realm), encrypt(e) {}
NullAuthenticator::~NullAuthenticator() {}
@@ -214,7 +220,7 @@ void NullAuthenticator::start(const stri
} else if (i != string::npos) {
//authorization id is first null delimited field
uid = response->substr(0, i);
- }//else not a valid SASL PLAIN response, throw error?
+ }//else not a valid SASL PLAIN response, throw error?
if (!uid.empty()) {
//append realm if it has not already been added
i = uid.find(realm);
@@ -226,7 +232,12 @@ void NullAuthenticator::start(const stri
}
} else {
connection.setUserId("anonymous");
- }
+ }
+ AclModule* acl = connection.getBroker().getAcl();
+ if (acl && !acl->approveConnection(connection))
+ {
+ throw ConnectionForcedException("User connection denied by configured limit");
+ }
client.tune(framing::CHANNEL_MAX, connection.getFrameMax(), 0, connection.getHeartbeatMax());
}
@@ -240,7 +251,7 @@ std::auto_ptr<SecurityLayer> NullAuthent
#if HAVE_SASL
-CyrusAuthenticator::CyrusAuthenticator(Connection& c, bool _encrypt) :
+CyrusAuthenticator::CyrusAuthenticator(Connection& c, bool _encrypt) :
sasl_conn(0), connection(c), client(c.getOutput()), encrypt(_encrypt)
{
init();
@@ -271,17 +282,17 @@ void CyrusAuthenticator::init()
NULL, /* Callbacks */
0, /* Connection flags */
&sasl_conn);
-
+
if (SASL_OK != code) {
QPID_LOG(error, "SASL: Connection creation failed: [" << code << "] " << sasl_errdetail(sasl_conn));
-
+
// TODO: Change this to an exception signaling
// server error, when one is available
throw ConnectionForcedException("Unable to perform authentication");
}
sasl_security_properties_t secprops;
-
+
//TODO: should the actual SSF values be configurable here?
secprops.min_ssf = encrypt ? 10: 0;
secprops.max_ssf = 256;
@@ -319,14 +330,14 @@ void CyrusAuthenticator::init()
secprops.property_values = 0;
secprops.security_flags = 0; /* or SASL_SEC_NOANONYMOUS etc as appropriate */
/*
- * The nodict flag restricts SASL authentication mechanisms
- * to those that are not susceptible to dictionary attacks.
- * They are:
+ * The nodict flag restricts SASL authentication mechanisms
+ * to those that are not susceptible to dictionary attacks.
+ * They are:
* SRP
* PASSDSS-3DES-1
* EXTERNAL
*/
- if (external.nodict) secprops.security_flags |= SASL_SEC_NODICTIONARY;
+ if (external.nodict) secprops.security_flags |= SASL_SEC_NODICTIONARY;
int result = sasl_setprop(sasl_conn, SASL_SEC_PROPS, &secprops);
if (result != SASL_OK) {
throw framing::InternalErrorException(QPID_MSG("SASL error: " << result));
@@ -371,10 +382,10 @@ void CyrusAuthenticator::getMechanisms(A
"", separator, "",
&list, &list_len,
&count);
-
+
if (SASL_OK != code) {
QPID_LOG(info, "SASL: Mechanism listing failed: " << sasl_errdetail(sasl_conn));
-
+
// TODO: Change this to an exception signaling
// server error, when one is available
throw ConnectionForcedException("Mechanism listing failed");
@@ -382,17 +393,17 @@ void CyrusAuthenticator::getMechanisms(A
string mechanism;
unsigned int start;
unsigned int end;
-
+
QPID_LOG(info, "SASL: Mechanism list: " << list);
-
+
end = 0;
do {
start = end;
-
+
// Seek to end of next mechanism
while (end < list_len && separator[0] != list[end])
end++;
-
+
// Record the mechanism
mechanisms.add(boost::shared_ptr<FieldValue>(new Str16Value(string(list, start, end - start))));
end++;
@@ -404,20 +415,20 @@ void CyrusAuthenticator::start(const str
{
const char *challenge;
unsigned int challenge_len;
-
+
// This should be at same debug level as mech list in getMechanisms().
QPID_LOG(info, "SASL: Starting authentication with mechanism: " << mechanism);
int code = sasl_server_start(sasl_conn,
mechanism.c_str(),
(response ? response->c_str() : 0), (response ? response->size() : 0),
&challenge, &challenge_len);
-
+
processAuthenticationStep(code, challenge, challenge_len);
qmf::org::apache::qpid::broker::Connection* cnxMgmt = connection.getMgmtObject();
- if ( cnxMgmt )
+ if ( cnxMgmt )
cnxMgmt->set_saslMechanism(mechanism);
}
-
+
void CyrusAuthenticator::step(const string& response)
{
const char *challenge;
@@ -439,10 +450,17 @@ void CyrusAuthenticator::processAuthenti
// authentication failure, when one is available
throw ConnectionForcedException("Authenticated username unavailable");
}
- QPID_LOG(info, connection.getMgmtId() << " SASL: Authentication succeeded for: " << uid);
connection.setUserId(uid);
+ AclModule* acl = connection.getBroker().getAcl();
+ if (acl && !acl->approveConnection(connection))
+ {
+ throw ConnectionForcedException("User connection denied by configured limit");
+ }
+
+ QPID_LOG(info, connection.getMgmtId() << " SASL: Authentication succeeded for: " << uid);
+
client.tune(framing::CHANNEL_MAX, connection.getFrameMax(), 0, connection.getHeartbeatMax());
} else if (SASL_CONTINUE == code) {
string challenge_str(challenge, challenge_len);
@@ -490,7 +508,7 @@ std::auto_ptr<SecurityLayer> CyrusAuthen
securityLayer = std::auto_ptr<SecurityLayer>(new CyrusSecurityLayer(sasl_conn, maxFrameSize));
}
qmf::org::apache::qpid::broker::Connection* cnxMgmt = connection.getMgmtObject();
- if ( cnxMgmt )
+ if ( cnxMgmt )
cnxMgmt->set_saslSsf(ssf);
return securityLayer;
}
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/SaslAuthenticator.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/SaslAuthenticator.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/SaslAuthenticator.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/SaslAuthenticator.h Fri Aug 3 12:13:32 2012
@@ -54,7 +54,7 @@ public:
static void init(const std::string& saslName, std::string const & saslConfigPath );
static void fini(void);
- static std::auto_ptr<SaslAuthenticator> createAuthenticator(Connection& connection, bool isShadow);
+ static std::auto_ptr<SaslAuthenticator> createAuthenticator(Connection& connection);
virtual void callUserIdCallbacks() { }
};
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/SecureConnectionFactory.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/SecureConnectionFactory.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/SecureConnectionFactory.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/SecureConnectionFactory.cpp Fri Aug 3 12:13:32 2012
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -41,11 +41,6 @@ SecureConnectionFactory::SecureConnectio
sys::ConnectionCodec*
SecureConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id,
const SecuritySettings& external) {
- if (broker.getConnectionCounter().allowConnection())
- {
- QPID_LOG(error, "Client max connection count limit exceeded: " << broker.getOptions().maxConnections << " connection refused");
- return 0;
- }
if (v == ProtocolVersion(0, 10)) {
SecureConnectionPtr sc(new SecureConnection());
CodecPtr c(new amqp_0_10::Connection(out, id, false));
@@ -71,5 +66,5 @@ SecureConnectionFactory::create(sys::Out
return sc.release();
}
-
+
}} // namespace qpid::broker
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/SemanticState.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/SemanticState.cpp Fri Aug 3 12:13:32 2012
@@ -72,7 +72,8 @@ SemanticState::SemanticState(DeliveryAda
dtxSelected(false),
authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isUserProxyAuth()),
userID(getSession().getConnection().getUserId()),
- closeComplete(false)
+ closeComplete(false),
+ connectionId(getSession().getConnection().getUrl())
{}
SemanticState::~SemanticState() {
@@ -142,6 +143,7 @@ bool SemanticState::cancel(const string&
DeliveryRecords::iterator removed =
remove_if(unacked.begin(), unacked.end(), bind(&DeliveryRecord::isRedundant, _1));
unacked.erase(removed, unacked.end());
+ getSession().setUnackedCount(unacked.size());
return true;
} else {
return false;
@@ -270,6 +272,7 @@ void SemanticState::checkDtxTimeout()
void SemanticState::record(const DeliveryRecord& delivery)
{
unacked.push_back(delivery);
+ getSession().setUnackedCount(unacked.size());
}
const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency");
@@ -426,7 +429,7 @@ void SemanticState::cancel(ConsumerImpl:
if(queue) {
queue->cancel(c);
if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) {
- Queue::tryAutoDelete(session.getBroker(), queue);
+ Queue::tryAutoDelete(session.getBroker(), queue, connectionId, userID);
}
}
c->cancel();
@@ -555,6 +558,7 @@ void SemanticState::recover(bool requeue
//w.r.t id is lost
sort(unacked.begin(), unacked.end());
}
+ getSession().setUnackedCount(unacked.size());
}
void SemanticState::deliver(DeliveryRecord& msg, bool sync)
@@ -712,6 +716,7 @@ void SemanticState::release(DeliveryId f
DeliveryRecords::iterator removed =
remove_if(range.start, range.end, bind(&DeliveryRecord::isRedundant, _1));
unacked.erase(removed, range.end);
+ getSession().setUnackedCount(unacked.size());
}
void SemanticState::reject(DeliveryId first, DeliveryId last)
@@ -723,6 +728,7 @@ void SemanticState::reject(DeliveryId fi
if (i->isRedundant()) i = unacked.erase(i);
else i++;
}
+ getSession().setUnackedCount(unacked.size());
}
bool SemanticState::ConsumerImpl::doOutput()
@@ -810,6 +816,7 @@ void SemanticState::accepted(const Seque
(TransactionContext*) 0)));
unacked.erase(removed, unacked.end());
}
+ getSession().setUnackedCount(unacked.size());
}
void SemanticState::completed(const SequenceSet& commands) {
@@ -819,6 +826,7 @@ void SemanticState::completed(const Sequ
bind(&SemanticState::complete, this, _1)));
unacked.erase(removed, unacked.end());
requestDispatch();
+ getSession().setUnackedCount(unacked.size());
}
void SemanticState::attached()
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/SemanticState.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/SemanticState.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/SemanticState.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/SemanticState.h Fri Aug 3 12:13:32 2012
@@ -146,6 +146,8 @@ class SemanticState : private boost::non
std::string getResumeId() const { return resumeId; };
const std::string& getTag() const { return tag; }
uint64_t getResumeTtl() const { return resumeTtl; }
+ uint32_t getDeliveryCount() const { return deliveryCount; }
+ void setDeliveryCount(uint32_t _deliveryCount) { deliveryCount = _deliveryCount; }
const framing::FieldTable& getArguments() const { return arguments; }
SemanticState& getParent() { return *parent; }
@@ -180,6 +182,8 @@ class SemanticState : private boost::non
const bool authMsg;
const std::string userID;
bool closeComplete;
+ //needed for queue delete events in auto-delete:
+ const std::string connectionId;
void route(boost::intrusive_ptr<Message> msg, Deliverable& strategy);
void checkDtxTimeout();
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/SessionAdapter.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/SessionAdapter.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/SessionAdapter.cpp Fri Aug 3 12:13:32 2012
@@ -41,6 +41,8 @@
namespace qpid {
namespace broker {
+using std::string;
+
using namespace qpid;
using namespace qpid::framing;
using namespace qpid::framing::dtx;
@@ -107,6 +109,12 @@ void SessionAdapter::ExchangeHandlerImpl
false,
ManagementAgent::toMap(args),
"existing"));
+ QPID_LOG_CAT(debug, model, "Create exchange. name:" << exchange
+ << " user:" << getConnection().getUserId()
+ << " rhost:" << getConnection().getUrl()
+ << " type:" << type
+ << " alternateExchange:" << alternateExchange
+ << " durable:" << (durable ? "T" : "F"));
}
}catch(UnknownExchangeTypeException& /*e*/){
throw NotFoundException(QPID_MSG("Exchange type not implemented: " << type));
@@ -204,7 +212,10 @@ ExchangeBoundResult SessionAdapter::Exch
}
}
-SessionAdapter::QueueHandlerImpl::QueueHandlerImpl(SemanticState& session) : HandlerHelper(session), broker(getBroker())
+SessionAdapter::QueueHandlerImpl::QueueHandlerImpl(SemanticState& session)
+ : HandlerHelper(session), broker(getBroker()),
+ //record connection id and userid for deleting exclsuive queues after session has ended:
+ connectionId(getConnection().getUrl()), userId(getConnection().getUserId())
{}
@@ -223,7 +234,7 @@ void SessionAdapter::QueueHandlerImpl::d
Queue::shared_ptr q(exclusiveQueues.front());
q->releaseExclusiveOwnership();
if (q->canAutoDelete()) {
- Queue::tryAutoDelete(broker, q);
+ Queue::tryAutoDelete(broker, q, connectionId, userId);
}
exclusiveQueues.erase(exclusiveQueues.begin());
}
@@ -307,6 +318,14 @@ void SessionAdapter::QueueHandlerImpl::d
agent->raiseEvent(_qmf::EventQueueDeclare(getConnection().getUrl(), getConnection().getUserId(),
name, durable, exclusive, autoDelete, alternateExchange, ManagementAgent::toMap(arguments),
"existing"));
+ QPID_LOG_CAT(debug, model, "Create queue. name:" << name
+ << " user:" << getConnection().getUserId()
+ << " rhost:" << getConnection().getUrl()
+ << " durable:" << (durable ? "T" : "F")
+ << " exclusive:" << (exclusive ? "T" : "F")
+ << " autodelete:" << (autoDelete ? "T" : "F")
+ << " alternateExchange:" << alternateExchange
+ );
}
}
@@ -411,6 +430,12 @@ SessionAdapter::MessageHandlerImpl::subs
if (agent)
agent->raiseEvent(_qmf::EventSubscribe(getConnection().getUrl(), getConnection().getUserId(),
queueName, destination, exclusive, ManagementAgent::toMap(arguments)));
+ QPID_LOG_CAT(debug, model, "Create subscription. queue:" << queueName
+ << " destination:" << destination
+ << " user:" << getConnection().getUserId()
+ << " rhost:" << getConnection().getUrl()
+ << " exclusive:" << (exclusive ? "T" : "F")
+ );
}
void
@@ -423,6 +448,9 @@ SessionAdapter::MessageHandlerImpl::canc
ManagementAgent* agent = getBroker().getManagementAgent();
if (agent)
agent->raiseEvent(_qmf::EventUnsubscribe(getConnection().getUrl(), getConnection().getUserId(), destination));
+ QPID_LOG_CAT(debug, model, "Delete subscription. destination:" << destination
+ << " user:" << getConnection().getUserId()
+ << " rhost:" << getConnection().getUrl() );
}
void
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/SessionAdapter.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/SessionAdapter.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/SessionAdapter.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/SessionAdapter.h Fri Aug 3 12:13:32 2012
@@ -121,6 +121,9 @@ class Queue;
{
Broker& broker;
std::vector< boost::shared_ptr<Queue> > exclusiveQueues;
+ //connectionId and userId are needed for queue-delete events for auto deleted, exclusive queues
+ std::string connectionId;
+ std::string userId;
public:
QueueHandlerImpl(SemanticState& session);
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/SessionContext.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/SessionContext.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/SessionContext.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/SessionContext.h Fri Aug 3 12:13:32 2012
@@ -47,6 +47,7 @@ class SessionContext : public OwnershipT
virtual uint16_t getChannel() const = 0;
virtual const SessionId& getSessionId() const = 0;
virtual void addPendingExecutionSync() = 0;
+ virtual void setUnackedCount(uint64_t) {}
};
}} // namespace qpid::broker
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/SessionHandler.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/SessionHandler.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/SessionHandler.cpp Fri Aug 3 12:13:32 2012
@@ -35,23 +35,39 @@ SessionHandler::SessionHandler(Connectio
: amqp_0_10::SessionHandler(&c.getOutput(), ch),
connection(c),
proxy(out),
- clusterOrderProxy(c.getClusterOrderOutput() ? new SetChannelProxy(ch, c.getClusterOrderOutput()) : 0)
+ clusterOrderProxy(c.getClusterOrderOutput() ?
+ new SetChannelProxy(ch, c.getClusterOrderOutput()) : 0)
{}
SessionHandler::~SessionHandler() {}
-void SessionHandler::connectionException(framing::connection::CloseCode code, const std::string& msg) {
+void SessionHandler::connectionException(
+ framing::connection::CloseCode code, const std::string& msg)
+{
// NOTE: must tell the error listener _before_ calling connection.close()
- if (connection.getErrorListener()) connection.getErrorListener()->connectionError(msg);
+ if (connection.getErrorListener())
+ connection.getErrorListener()->connectionError(msg);
+ if (errorListener)
+ errorListener->connectionException(code, msg);
connection.close(code, msg);
}
-void SessionHandler::channelException(framing::session::DetachCode, const std::string& msg) {
- if (connection.getErrorListener()) connection.getErrorListener()->sessionError(getChannel(), msg);
+void SessionHandler::channelException(
+ framing::session::DetachCode code, const std::string& msg)
+{
+ if (connection.getErrorListener())
+ connection.getErrorListener()->sessionError(getChannel(), msg);
+ if (errorListener)
+ errorListener->channelException(code, msg);
}
-void SessionHandler::executionException(framing::execution::ErrorCode, const std::string& msg) {
- if (connection.getErrorListener()) connection.getErrorListener()->sessionError(getChannel(), msg);
+void SessionHandler::executionException(
+ framing::execution::ErrorCode code, const std::string& msg)
+{
+ if (connection.getErrorListener())
+ connection.getErrorListener()->sessionError(getChannel(), msg);
+ if (errorListener)
+ errorListener->executionException(code, msg);
}
ConnectionState& SessionHandler::getConnection() { return connection; }
@@ -64,7 +80,7 @@ void SessionHandler::handleDetach() {
if (session.get())
connection.getBroker().getSessionManager().detach(session);
assert(!session.get());
- if (detachedCallback) detachedCallback();
+ if (errorListener) errorListener->detach();
connection.closeChannel(channel.get());
}
@@ -118,8 +134,4 @@ void SessionHandler::attached(const std:
}
}
-void SessionHandler::setDetachedCallback(boost::function<void()> cb) {
- detachedCallback = cb;
-}
-
}} // namespace qpid::broker
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/SessionHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/SessionHandler.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/SessionHandler.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/SessionHandler.h Fri Aug 3 12:13:32 2012
@@ -25,7 +25,7 @@
#include "qpid/amqp_0_10/SessionHandler.h"
#include "qpid/broker/SessionHandler.h"
#include "qpid/framing/AMQP_ClientProxy.h"
-#include <boost/function.hpp>
+#include <boost/shared_ptr.hpp>
namespace qpid {
class SessionState;
@@ -43,6 +43,21 @@ class SessionState;
*/
class SessionHandler : public amqp_0_10::SessionHandler {
public:
+ class ErrorListener {
+ public:
+ virtual ~ErrorListener() {}
+ virtual void connectionException(
+ framing::connection::CloseCode code, const std::string& msg) = 0;
+ virtual void channelException(
+ framing::session::DetachCode, const std::string& msg) = 0;
+ virtual void executionException(
+ framing::execution::ErrorCode, const std::string& msg) = 0;
+ /** Called when it is safe to delete the ErrorListener. */
+ virtual void detach() = 0;
+ };
+
+ /**
+ *@param e must not be deleted until ErrorListener::detach has been called */
SessionHandler(Connection&, framing::ChannelId);
~SessionHandler();
@@ -71,7 +86,7 @@ class SessionHandler : public amqp_0_10:
void attached(const std::string& name);//used by 'pushing' inter-broker bridges
void attachAs(const std::string& name);//used by 'pulling' inter-broker bridges
- void setDetachedCallback(boost::function<void()> cb);
+ void setErrorListener(boost::shared_ptr<ErrorListener> e) { errorListener = e; }
protected:
virtual void setState(const std::string& sessionName, bool force);
@@ -94,7 +109,7 @@ class SessionHandler : public amqp_0_10:
framing::AMQP_ClientProxy proxy;
std::auto_ptr<SessionState> session;
std::auto_ptr<SetChannelProxy> clusterOrderProxy;
- boost::function<void ()> detachedCallback;
+ boost::shared_ptr<ErrorListener> errorListener;
};
}} // namespace qpid::broker
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/SessionState.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/SessionState.cpp Fri Aug 3 12:13:32 2012
@@ -156,7 +156,7 @@ ManagementObject* SessionState::GetManag
Manageable::status_t SessionState::ManagementMethod (uint32_t methodId,
Args& /*args*/,
- string& /*text*/)
+ std::string& /*text*/)
{
Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/SessionState.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/SessionState.h Fri Aug 3 12:13:32 2012
@@ -126,6 +126,11 @@ class SessionState : public qpid::Sessio
// the SessionState of a received Execution.Sync command.
void addPendingExecutionSync();
+ void setUnackedCount(uint64_t count) {
+ if (mgmtObject)
+ mgmtObject->set_unackedMessages(count);
+ }
+
// Used to delay creation of management object for sessions
// belonging to inter-broker bridges
void addManagementObject();
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/System.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/System.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/System.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/System.cpp Fri Aug 3 12:13:32 2012
@@ -37,7 +37,6 @@ System::System (string _dataDir, Broker*
if (agent != 0)
{
- framing::Uuid systemId;
if (_dataDir.empty ())
{
@@ -66,14 +65,13 @@ System::System (string _dataDir, Broker*
}
mgmtObject = new _qmf::System(agent, this, types::Uuid(systemId.c_array()));
- std::string sysname, nodename, release, version, machine;
- qpid::sys::SystemInfo::getSystemId (sysname,
- nodename,
+ qpid::sys::SystemInfo::getSystemId (osName,
+ nodeName,
release,
version,
machine);
- mgmtObject->set_osName (sysname);
- mgmtObject->set_nodeName (nodename);
+ mgmtObject->set_osName (osName);
+ mgmtObject->set_nodeName (nodeName);
mgmtObject->set_release (release);
mgmtObject->set_version (version);
mgmtObject->set_machine (machine);
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/System.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/System.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/System.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/System.h Fri Aug 3 12:13:32 2012
@@ -21,6 +21,7 @@
//
#include "qpid/management/Manageable.h"
+#include "qpid/framing/Uuid.h"
#include "qmf/org/apache/qpid/broker/System.h"
#include <boost/shared_ptr.hpp>
#include <string>
@@ -35,6 +36,8 @@ class System : public management::Manage
private:
qmf::org::apache::qpid::broker::System* mgmtObject;
+ framing::Uuid systemId;
+ std::string osName, nodeName, release, version, machine;
public:
@@ -44,6 +47,20 @@ class System : public management::Manage
management::ManagementObject* GetManagementObject (void) const
{ return mgmtObject; }
+
+
+ /** Persistent UUID assigned by the management system to this broker. */
+ framing::Uuid getSystemId() const { return systemId; }
+ /** Returns the OS name; e.g., GNU/Linux or Windows */
+ std::string getOsName() const { return osName; }
+ /** Returns the node name. Usually the same as the host name. */
+ std::string getNodeName() const { return nodeName; }
+ /** Returns the OS release identifier. */
+ std::string getRelease() const { return release; }
+ /** Returns the OS release version (kernel, build, sp, etc.) */
+ std::string getVersion() const { return version; }
+ /** Returns the hardware type. */
+ std::string getMachine() const { return machine; }
};
}}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org