You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/05/31 17:48:21 UTC
svn commit: r1129767 - in /activemq/activemq-apollo/trunk:
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-dto/src/main/java/org/apache/activemq/apollo/dto/
apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/ apol...
Author: chirino
Date: Tue May 31 15:48:20 2011
New Revision: 1129767
URL: http://svn.apache.org/viewvc?rev=1129767&view=rev
Log:
Fixes https://issues.apache.org/jira/browse/APLO-31 : Decouple the connections from the connector that accepted them
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectorStatusDTO.java
activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala
activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/Support.scala
activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/ConnectorStatusDTO.jade
activemq/activemq-apollo/trunk/apollo-website/src/documentation/management-api.md
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=1129767&r1=1129766&r2=1129767&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Tue May 31 15:48:20 2011
@@ -29,7 +29,7 @@ import ReporterLevel._
import security.{AclAuthorizer, Authorizer, JaasAuthenticator, Authenticator}
import java.net.InetSocketAddress
import org.apache.activemq.apollo.broker.web._
-import collection.mutable.{HashSet, LinkedHashMap}
+import collection.mutable.{HashSet, LinkedHashMap, HashMap}
import scala.util.Random
import FileSupport._
import org.apache.activemq.apollo.dto.{LogCategoryDTO, BrokerDTO}
@@ -204,6 +204,7 @@ class Broker() extends BaseService {
val virtual_hosts_by_hostname = new LinkedHashMap[AsciiBuffer, VirtualHost]()
var connectors: List[Connector] = Nil
+ val connections = HashMap[Long, BrokerConnection]()
val dispatch_queue = createQueue("broker")
@@ -366,6 +367,12 @@ class Broker() extends BaseService {
connectors.foreach( x=>
tracker.stop(x)
)
+
+ // stop the connections..
+ connections.valuesIterator.foreach { connection=>
+ tracker.stop(connection)
+ }
+
// Shutdown the virtual host services
virtual_hosts.valuesIterator.foreach( x=>
tracker.stop(x)
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala?rev=1129767&r1=1129766&r2=1129767&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala Tue May 31 15:48:20 2011
@@ -67,8 +67,8 @@ trait Connector extends BaseService {
def id:String
def stopped(connection:BrokerConnection):Unit
def config:ConnectorDTO
- def connections:HashMap[Long, BrokerConnection]
- def connection_counter:LongCounter
+ def accepted:LongCounter
+ def connected:LongCounter
}
@@ -87,9 +87,8 @@ class AcceptingConnector(val broker:Brok
var config:ConnectorDTO = defaultConfig
var transport_server:TransportServer = _
var protocol:Protocol = _
-
- val connections = HashMap[Long, BrokerConnection]()
- val connection_counter = new LongCounter()
+ val accepted = new LongCounter()
+ val connected = new LongCounter()
override def toString = "connector: "+config.id
@@ -103,7 +102,8 @@ class AcceptingConnector(val broker:Brok
transport.setProtocolCodec(protocol.createProtocolCodec)
}
- connection_counter.incrementAndGet
+ accepted.incrementAndGet
+ connected.incrementAndGet()
var connection = new BrokerConnection(AcceptingConnector.this, broker.connection_id_counter.incrementAndGet)
connection.dispatch_queue.setLabel("connection %d to %s".format(connection.id, transport.getRemoteAddress))
connection.protocol_handler = protocol.createProtocolHandler
@@ -111,7 +111,7 @@ class AcceptingConnector(val broker:Brok
broker.init_dispatch_queue(connection.dispatch_queue)
- connections.put(connection.id, connection)
+ broker.connections.put(connection.id, connection)
try {
connection.start()
} catch {
@@ -122,14 +122,14 @@ class AcceptingConnector(val broker:Brok
if(at_connection_limit) {
// We stop accepting connections at this point.
- info("Connection limit reached. Clients connected: %d", connections.size)
+ info("Connection limit reached. Clients connected: %d", connected.get)
transport_server.suspend
}
}
}
def at_connection_limit = {
- connections.size >= config.connection_limit.getOrElse(Integer.MAX_VALUE)
+ connected.get >= config.connection_limit.getOrElse(Integer.MAX_VALUE)
}
/**
@@ -173,11 +173,7 @@ class AcceptingConnector(val broker:Brok
override def _stop(on_completed:Runnable): Unit = {
transport_server.stop(^{
broker.console_log.info("Stopped connector at: "+config.bind)
- val tracker = new LoggingTracker(toString, broker.console_log, dispatch_queue)
- connections.valuesIterator.foreach { connection=>
- tracker.stop(connection)
- }
- tracker.callback(on_completed)
+ on_completed.run
})
}
@@ -187,7 +183,8 @@ class AcceptingConnector(val broker:Brok
*/
def stopped(connection:BrokerConnection) = dispatch_queue {
val at_limit = at_connection_limit
- if( connections.remove(connection.id).isDefined ) {
+ if( broker.connections.remove(connection.id).isDefined ) {
+ connected.decrementAndGet()
if( at_limit ) {
transport_server.resume
}
Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectorStatusDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectorStatusDTO.java?rev=1129767&r1=1129766&r2=1129767&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectorStatusDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectorStatusDTO.java Tue May 31 15:48:20 2011
@@ -39,9 +39,9 @@ public class ConnectorStatusDTO extends
public long accepted;
/**
- * Ids of all open connections that the connector is managing.
+ * The number of connections that this connector has currently connected.
*/
- @XmlElement(name="connection")
- public List<LongIdLabeledDTO> connections = new ArrayList<LongIdLabeledDTO>();
+ @XmlAttribute
+ public long connected;
}
Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala?rev=1129767&r1=1129766&r2=1129767&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala Tue May 31 15:48:20 2011
@@ -74,11 +74,9 @@ case class BrokerResource() extends Reso
result.connectors.add( c.id )
}
- broker.connectors.foreach{ connector=>
- connector.connections.foreach { case (id,connection) =>
- // TODO: may need to sync /w connection's dispatch queue
- result.connections.add( new LongIdLabeledDTO(id, connection.transport.getRemoteAddress ) )
- }
+ broker.connections.foreach { case (id,connection) =>
+ // TODO: may need to sync /w connection's dispatch queue
+ result.connections.add( new LongIdLabeledDTO(id, connection.transport.getRemoteAddress ) )
}
result
@@ -440,12 +438,8 @@ case class BrokerResource() extends Reso
result.id = connector.id.toString
result.state = connector.service_state.toString
result.state_since = connector.service_state.since
-
- result.accepted = connector.connection_counter.get
- connector.connections.foreach { case (id,connection) =>
- // TODO: may need to sync /w connection's dispatch queue
- result.connections.add( new LongIdLabeledDTO(id, connection.transport.getRemoteAddress ) )
- }
+ result.accepted = connector.accepted.get
+ result.connected = connector.connected.get
result
}
@@ -461,11 +455,7 @@ case class BrokerResource() extends Reso
with_broker { broker =>
monitoring(broker) {
- val values = ListBuffer[BrokerConnection]()
- broker.connectors.foreach { connector=>
- values ++= connector.connections.values
- }
- val records = sync_all(values) { value =>
+ val records = sync_all(broker.connections.values) { value =>
value.get_connection_status
}
Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/Support.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/Support.scala?rev=1129767&r1=1129766&r2=1129767&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/Support.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/Support.scala Tue May 31 15:48:20 2011
@@ -298,7 +298,7 @@ abstract class Resource(parent:Resource=
protected def with_connection[T](id:Long)(func: BrokerConnection=>FutureResult[T]):FutureResult[T] = {
with_broker { broker =>
- broker.connectors.flatMap{ _.connections.get(id) }.headOption match {
+ broker.connections.get(id) match {
case Some(connection:BrokerConnection) =>
sync(connection) {
func(connection)
Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/ConnectorStatusDTO.jade
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/ConnectorStatusDTO.jade?rev=1129767&r1=1129766&r2=1129767&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/ConnectorStatusDTO.jade (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/ConnectorStatusDTO.jade Tue May 31 15:48:20 2011
@@ -22,12 +22,8 @@ h1 Connector: #{id}
p state: #{state} #{ uptime(state_since) } ago
-p accepted connections: #{accepted}
+p total accepted: #{accepted}
-h2 Connections
-ul
- - for( x <- connections )
- li
- a(href={ path("../../connections/"+x.id) }) #{x.label}
+p currently connected: #{connected}
Modified: activemq/activemq-apollo/trunk/apollo-website/src/documentation/management-api.md
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-website/src/documentation/management-api.md?rev=1129767&r1=1129766&r2=1129767&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-website/src/documentation/management-api.md (original)
+++ activemq/activemq-apollo/trunk/apollo-website/src/documentation/management-api.md Tue May 31 15:48:20 2011
@@ -185,10 +185,7 @@ Results in a [Connector Status](./api/ap
"state":"STARTED",
"state_since":1305553109899,
"accepted":6,
- "connections":[
- {"id":5,"label":"/127.0.0.1:52638"},
- {"id":6,"label":"/127.0.0.1:52639"}
- ]
+ "connected":2
}
{pygmentize}