You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/05/31 17:48:21 UTC

svn commit: r1129767 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/ apol...

Author: chirino
Date: Tue May 31 15:48:20 2011
New Revision: 1129767

URL: http://svn.apache.org/viewvc?rev=1129767&view=rev
Log:
Fixes https://issues.apache.org/jira/browse/APLO-31 : Decouple the connections from the connector that accepted them

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectorStatusDTO.java
    activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala
    activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/Support.scala
    activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/ConnectorStatusDTO.jade
    activemq/activemq-apollo/trunk/apollo-website/src/documentation/management-api.md

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=1129767&r1=1129766&r2=1129767&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Tue May 31 15:48:20 2011
@@ -29,7 +29,7 @@ import ReporterLevel._
 import security.{AclAuthorizer, Authorizer, JaasAuthenticator, Authenticator}
 import java.net.InetSocketAddress
 import org.apache.activemq.apollo.broker.web._
-import collection.mutable.{HashSet, LinkedHashMap}
+import collection.mutable.{HashSet, LinkedHashMap, HashMap}
 import scala.util.Random
 import FileSupport._
 import org.apache.activemq.apollo.dto.{LogCategoryDTO, BrokerDTO}
@@ -204,6 +204,7 @@ class Broker() extends BaseService {
   val virtual_hosts_by_hostname = new LinkedHashMap[AsciiBuffer, VirtualHost]()
 
   var connectors: List[Connector] = Nil
+  val connections = HashMap[Long, BrokerConnection]()
 
   val dispatch_queue = createQueue("broker")
 
@@ -366,6 +367,12 @@ class Broker() extends BaseService {
     connectors.foreach( x=>
       tracker.stop(x)
     )
+
+    // stop the connections..
+    connections.valuesIterator.foreach { connection=>
+      tracker.stop(connection)
+    }
+
     // Shutdown the virtual host services
     virtual_hosts.valuesIterator.foreach( x=>
       tracker.stop(x)

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala?rev=1129767&r1=1129766&r2=1129767&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala Tue May 31 15:48:20 2011
@@ -67,8 +67,8 @@ trait Connector extends BaseService {
   def id:String
   def stopped(connection:BrokerConnection):Unit
   def config:ConnectorDTO
-  def connections:HashMap[Long, BrokerConnection]
-  def connection_counter:LongCounter
+  def accepted:LongCounter
+  def connected:LongCounter
 
 }
 
@@ -87,9 +87,8 @@ class AcceptingConnector(val broker:Brok
   var config:ConnectorDTO = defaultConfig
   var transport_server:TransportServer = _
   var protocol:Protocol = _
-
-  val connections = HashMap[Long, BrokerConnection]()
-  val connection_counter = new LongCounter()
+  val accepted = new LongCounter()
+  val connected = new LongCounter()
 
   override def toString = "connector: "+config.id
 
@@ -103,7 +102,8 @@ class AcceptingConnector(val broker:Brok
         transport.setProtocolCodec(protocol.createProtocolCodec)
       }
 
-      connection_counter.incrementAndGet
+      accepted.incrementAndGet
+      connected.incrementAndGet()
       var connection = new BrokerConnection(AcceptingConnector.this, broker.connection_id_counter.incrementAndGet)
       connection.dispatch_queue.setLabel("connection %d to %s".format(connection.id, transport.getRemoteAddress))
       connection.protocol_handler = protocol.createProtocolHandler
@@ -111,7 +111,7 @@ class AcceptingConnector(val broker:Brok
 
       broker.init_dispatch_queue(connection.dispatch_queue)
 
-      connections.put(connection.id, connection)
+      broker.connections.put(connection.id, connection)
       try {
         connection.start()
       } catch {
@@ -122,14 +122,14 @@ class AcceptingConnector(val broker:Brok
 
       if(at_connection_limit) {
         // We stop accepting connections at this point.
-        info("Connection limit reached. Clients connected: %d", connections.size)
+        info("Connection limit reached. Clients connected: %d", connected.get)
         transport_server.suspend
       }
     }
   }
 
   def at_connection_limit = {
-    connections.size >= config.connection_limit.getOrElse(Integer.MAX_VALUE)
+    connected.get >= config.connection_limit.getOrElse(Integer.MAX_VALUE)
   }
 
   /**
@@ -173,11 +173,7 @@ class AcceptingConnector(val broker:Brok
   override def _stop(on_completed:Runnable): Unit = {
     transport_server.stop(^{
       broker.console_log.info("Stopped connector at: "+config.bind)
-      val tracker = new LoggingTracker(toString, broker.console_log, dispatch_queue)
-      connections.valuesIterator.foreach { connection=>
-        tracker.stop(connection)
-      }
-      tracker.callback(on_completed)
+      on_completed.run
     })
   }
 
@@ -187,7 +183,8 @@ class AcceptingConnector(val broker:Brok
    */
   def stopped(connection:BrokerConnection) = dispatch_queue {
     val at_limit = at_connection_limit
-    if( connections.remove(connection.id).isDefined ) {
+    if( broker.connections.remove(connection.id).isDefined ) {
+      connected.decrementAndGet()
       if( at_limit ) {
         transport_server.resume
       }

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectorStatusDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectorStatusDTO.java?rev=1129767&r1=1129766&r2=1129767&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectorStatusDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectorStatusDTO.java Tue May 31 15:48:20 2011
@@ -39,9 +39,9 @@ public class ConnectorStatusDTO extends 
     public long accepted;
 
     /**
-     * Ids of all open connections that the connector is managing.
+     * The number of connections that this connector has currently connected.
      */
-    @XmlElement(name="connection")
-    public List<LongIdLabeledDTO> connections = new ArrayList<LongIdLabeledDTO>();
+    @XmlAttribute
+    public long connected;
 
 }

Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala?rev=1129767&r1=1129766&r2=1129767&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala Tue May 31 15:48:20 2011
@@ -74,11 +74,9 @@ case class BrokerResource() extends Reso
           result.connectors.add( c.id )
         }
 
-        broker.connectors.foreach{ connector=>
-          connector.connections.foreach { case (id,connection) =>
-            // TODO: may need to sync /w connection's dispatch queue
-            result.connections.add( new LongIdLabeledDTO(id, connection.transport.getRemoteAddress ) )
-          }
+        broker.connections.foreach { case (id,connection) =>
+          // TODO: may need to sync /w connection's dispatch queue
+          result.connections.add( new LongIdLabeledDTO(id, connection.transport.getRemoteAddress ) )
         }
         result
 
@@ -440,12 +438,8 @@ case class BrokerResource() extends Reso
             result.id = connector.id.toString
             result.state = connector.service_state.toString
             result.state_since = connector.service_state.since
-
-            result.accepted = connector.connection_counter.get
-            connector.connections.foreach { case (id,connection) =>
-              // TODO: may need to sync /w connection's dispatch queue
-              result.connections.add( new LongIdLabeledDTO(id, connection.transport.getRemoteAddress ) )
-            }
+            result.accepted = connector.accepted.get
+            result.connected = connector.connected.get
 
             result
         }
@@ -461,11 +455,7 @@ case class BrokerResource() extends Reso
     with_broker { broker =>
       monitoring(broker) {
 
-        val values = ListBuffer[BrokerConnection]()
-        broker.connectors.foreach { connector=>
-          values ++= connector.connections.values
-        }
-        val records = sync_all(values) { value =>
+        val records = sync_all(broker.connections.values) { value =>
           value.get_connection_status
         }
 

Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/Support.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/Support.scala?rev=1129767&r1=1129766&r2=1129767&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/Support.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/Support.scala Tue May 31 15:48:20 2011
@@ -298,7 +298,7 @@ abstract class Resource(parent:Resource=
 
   protected def with_connection[T](id:Long)(func: BrokerConnection=>FutureResult[T]):FutureResult[T] = {
     with_broker { broker =>
-      broker.connectors.flatMap{ _.connections.get(id) }.headOption match {
+      broker.connections.get(id) match {
         case Some(connection:BrokerConnection) =>
           sync(connection) {
             func(connection)

Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/ConnectorStatusDTO.jade
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/ConnectorStatusDTO.jade?rev=1129767&r1=1129766&r2=1129767&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/ConnectorStatusDTO.jade (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/ConnectorStatusDTO.jade Tue May 31 15:48:20 2011
@@ -22,12 +22,8 @@ h1 Connector: #{id}
 
 p state: #{state} #{ uptime(state_since) } ago
 
-p accepted connections: #{accepted}
+p total accepted: #{accepted}
 
-h2 Connections
-ul
-  - for( x <- connections )
-    li
-      a(href={ path("../../connections/"+x.id) }) #{x.label}
+p currently connected: #{connected}
 
 

Modified: activemq/activemq-apollo/trunk/apollo-website/src/documentation/management-api.md
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-website/src/documentation/management-api.md?rev=1129767&r1=1129766&r2=1129767&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-website/src/documentation/management-api.md (original)
+++ activemq/activemq-apollo/trunk/apollo-website/src/documentation/management-api.md Tue May 31 15:48:20 2011
@@ -185,10 +185,7 @@ Results in a [Connector Status](./api/ap
   "state":"STARTED",
   "state_since":1305553109899,
   "accepted":6,
-  "connections":[
-    {"id":5,"label":"/127.0.0.1:52638"},
-    {"id":6,"label":"/127.0.0.1:52639"}
-  ]
+  "connected":2
 }
 {pygmentize}