You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cc...@apache.org on 2009/11/17 22:08:11 UTC
svn commit: r881517 - in /qpid/trunk/qpid/cpp/src/qpid/broker: Broker.cpp
Broker.h Connection.cpp ConnectionFactory.cpp SecureConnectionFactory.cpp
Author: cctrieloff
Date: Tue Nov 17 21:08:10 2009
New Revision: 881517
URL: http://svn.apache.org/viewvc?rev=881517&view=rev
Log:
QPID-2188 , support for maxConnections, limit is set to broker
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=881517&r1=881516&r2=881517&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Tue Nov 17 21:08:10 2009
@@ -155,6 +155,7 @@
queueEvents(poller,!conf.asyncQueueEvents),
recovery(true),
expiryPolicy(new ExpiryPolicy),
+ connectionCounter(conf.maxConnections),
getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this))
{
if (conf.enableMgmt) {
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=881517&r1=881516&r2=881517&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Tue Nov 17 21:08:10 2009
@@ -51,6 +51,7 @@
#include "qpid/sys/Timer.h"
#include "qpid/RefCounted.h"
#include "qpid/broker/AclModule.h"
+#include "qpid/sys/Mutex.h"
#include <boost/intrusive_ptr.hpp>
#include <string>
@@ -116,6 +117,26 @@
private:
std::string getHome();
};
+
+ class ConnectionCounter {
+ int maxConnections;
+ int connectionCount;
+ sys::Mutex connectionCountLock;
+ public:
+ ConnectionCounter(int mc): maxConnections(mc),connectionCount(0) {};
+ void inc_connectionCount() {
+ sys::ScopedLock<sys::Mutex> l(connectionCountLock);
+ connectionCount++;
+ }
+ void dec_connectionCount() {
+ sys::ScopedLock<sys::Mutex> l(connectionCountLock);
+ connectionCount--;
+ }
+ bool allowConnection() {
+ sys::ScopedLock<sys::Mutex> l(connectionCountLock);
+ return (maxConnections <= connectionCount);
+ }
+ };
private:
typedef std::map<std::string, boost::shared_ptr<sys::ProtocolFactory> > ProtocolFactoryMap;
@@ -147,7 +168,7 @@
std::string federationTag;
bool recovery;
boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
-
+ ConnectionCounter connectionCounter;
public:
virtual ~Broker();
@@ -238,6 +259,8 @@
bool getRecovery() const { return recovery; }
management::ManagementAgent* getManagementAgent() { return managementAgent.get(); }
+
+ ConnectionCounter& getConnectionCounter() {return connectionCounter;}
};
}}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=881517&r1=881516&r2=881517&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Tue Nov 17 21:08:10 2009
@@ -102,6 +102,7 @@
}
ConnectionState::setUrl(mgmtId);
}
+ if (!isShadow()) broker.getConnectionCounter().inc_connectionCount();
}
void Connection::requestIOProcessing(boost::function0<void> callback)
@@ -125,6 +126,8 @@
heartbeatTimer->cancel();
if (timeoutTimer)
timeoutTimer->cancel();
+
+ if (!isShadow()) broker.getConnectionCounter().dec_connectionCount();
}
void Connection::received(framing::AMQFrame& frame) {
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp?rev=881517&r1=881516&r2=881517&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp Tue Nov 17 21:08:10 2009
@@ -22,6 +22,7 @@
#include "qpid/framing/ProtocolVersion.h"
#include "qpid/amqp_0_10/Connection.h"
#include "qpid/broker/Connection.h"
+#include "qpid/log/Statement.h"
namespace qpid {
namespace broker {
@@ -37,6 +38,11 @@
sys::ConnectionCodec*
ConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id,
unsigned int ) {
+ if (broker.getConnectionCounter().allowConnection())
+ {
+ QPID_LOG(error, "Client max connection count limit exceeded: " << broker.getOptions().maxConnections << " connection refushed");
+ return 0;
+ }
if (v == ProtocolVersion(0, 10)) {
ConnectionPtr c(new amqp_0_10::Connection(out, id, false));
c->setInputHandler(InputPtr(new broker::Connection(c.get(), broker, id, false)));
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp?rev=881517&r1=881516&r2=881517&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp Tue Nov 17 21:08:10 2009
@@ -23,6 +23,7 @@
#include "qpid/amqp_0_10/Connection.h"
#include "qpid/broker/Connection.h"
#include "qpid/broker/SecureConnection.h"
+#include "qpid/log/Statement.h"
namespace qpid {
namespace broker {
@@ -38,6 +39,11 @@
sys::ConnectionCodec*
SecureConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id,
unsigned int conn_ssf ) {
+ if (broker.getConnectionCounter().allowConnection())
+ {
+ QPID_LOG(error, "Client max connection count limit exceeded: " << broker.getOptions().maxConnections << " connection refushed");
+ return 0;
+ }
if (v == ProtocolVersion(0, 10)) {
SecureConnectionPtr sc(new SecureConnection());
CodecPtr c(new amqp_0_10::Connection(out, id, false));
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org