You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cc...@apache.org on 2009/11/17 22:08:11 UTC

svn commit: r881517 - in /qpid/trunk/qpid/cpp/src/qpid/broker: Broker.cpp Broker.h Connection.cpp ConnectionFactory.cpp SecureConnectionFactory.cpp

Author: cctrieloff
Date: Tue Nov 17 21:08:10 2009
New Revision: 881517

URL: http://svn.apache.org/viewvc?rev=881517&view=rev
Log:
QPID-2188 , support for maxConnections, limit is set to broker

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=881517&r1=881516&r2=881517&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Tue Nov 17 21:08:10 2009
@@ -155,6 +155,7 @@
     queueEvents(poller,!conf.asyncQueueEvents), 
     recovery(true),
     expiryPolicy(new ExpiryPolicy),
+    connectionCounter(conf.maxConnections),
     getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this))
 {
     if (conf.enableMgmt) {

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=881517&r1=881516&r2=881517&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Tue Nov 17 21:08:10 2009
@@ -51,6 +51,7 @@
 #include "qpid/sys/Timer.h"
 #include "qpid/RefCounted.h"
 #include "qpid/broker/AclModule.h"
+#include "qpid/sys/Mutex.h"
 
 #include <boost/intrusive_ptr.hpp>
 #include <string>
@@ -116,6 +117,26 @@
       private:
         std::string getHome();
     };
+    
+    class ConnectionCounter {
+            int maxConnections;
+            int connectionCount;
+            sys::Mutex connectionCountLock;
+        public:
+            ConnectionCounter(int mc): maxConnections(mc),connectionCount(0) {};
+            void inc_connectionCount() {    
+                sys::ScopedLock<sys::Mutex> l(connectionCountLock); 
+                connectionCount++;
+            } 
+            void dec_connectionCount() {    
+                sys::ScopedLock<sys::Mutex> l(connectionCountLock); 
+                connectionCount--;
+            }
+            bool allowConnection() {
+                sys::ScopedLock<sys::Mutex> l(connectionCountLock); 
+                return (maxConnections <= connectionCount);
+            } 
+    };
 
   private:
     typedef std::map<std::string, boost::shared_ptr<sys::ProtocolFactory> > ProtocolFactoryMap;
@@ -147,7 +168,7 @@
     std::string federationTag;
     bool recovery;
     boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
-
+    ConnectionCounter connectionCounter;
   public:
     virtual ~Broker();
 
@@ -238,6 +259,8 @@
     bool getRecovery() const { return recovery; }
 
     management::ManagementAgent* getManagementAgent() { return managementAgent.get(); }
+    
+    ConnectionCounter& getConnectionCounter() {return connectionCounter;}
 };
 
 }}

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=881517&r1=881516&r2=881517&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Tue Nov 17 21:08:10 2009
@@ -102,6 +102,7 @@
         }
         ConnectionState::setUrl(mgmtId);
     }
+    if (!isShadow()) broker.getConnectionCounter().inc_connectionCount();
 }
 
 void Connection::requestIOProcessing(boost::function0<void> callback)
@@ -125,6 +126,8 @@
         heartbeatTimer->cancel();
     if (timeoutTimer)
         timeoutTimer->cancel();
+
+    if (!isShadow()) broker.getConnectionCounter().dec_connectionCount();
 }
 
 void Connection::received(framing::AMQFrame& frame) {

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp?rev=881517&r1=881516&r2=881517&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp Tue Nov 17 21:08:10 2009
@@ -22,6 +22,7 @@
 #include "qpid/framing/ProtocolVersion.h"
 #include "qpid/amqp_0_10/Connection.h"
 #include "qpid/broker/Connection.h"
+#include "qpid/log/Statement.h"
 
 namespace qpid {
 namespace broker {
@@ -37,6 +38,11 @@
 sys::ConnectionCodec*
 ConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id,
                           unsigned int ) {
+    if (broker.getConnectionCounter().allowConnection())
+    {
+        QPID_LOG(error, "Client max connection count limit exceeded: " << broker.getOptions().maxConnections << " connection refushed");
+        return 0;
+    }
     if (v == ProtocolVersion(0, 10)) {
         ConnectionPtr c(new amqp_0_10::Connection(out, id, false));
         c->setInputHandler(InputPtr(new broker::Connection(c.get(), broker, id, false)));

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp?rev=881517&r1=881516&r2=881517&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp Tue Nov 17 21:08:10 2009
@@ -23,6 +23,7 @@
 #include "qpid/amqp_0_10/Connection.h"
 #include "qpid/broker/Connection.h"
 #include "qpid/broker/SecureConnection.h"
+#include "qpid/log/Statement.h"
 
 namespace qpid {
 namespace broker {
@@ -38,6 +39,11 @@
 sys::ConnectionCodec*
 SecureConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id,
                                 unsigned int conn_ssf ) {
+    if (broker.getConnectionCounter().allowConnection())
+    {
+        QPID_LOG(error, "Client max connection count limit exceeded: " << broker.getOptions().maxConnections << " connection refushed");
+        return 0;
+    }
     if (v == ProtocolVersion(0, 10)) {
         SecureConnectionPtr sc(new SecureConnection());
         CodecPtr c(new amqp_0_10::Connection(out, id, false));



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org